From 26a0d8bdbe7a15b9859185dd98b8ed96d2c838e4 Mon Sep 17 00:00:00 2001 From: 0xdcarns Date: Tue, 25 Jan 2022 22:14:31 -0500 Subject: [PATCH 1/6] initial commit --- controllers/node_grpc.go | 5 ++ logic/nodes.go | 15 ++++++ logic/peers.go | 5 ++ models/mqtt.go | 11 ++-- models/network.go | 9 ++-- netclient/functions/daemon.go | 78 ++++++++++++++++++++++++++++- netclient/ncutils/netclientutils.go | 10 ++++ 7 files changed, 123 insertions(+), 10 deletions(-) diff --git a/controllers/node_grpc.go b/controllers/node_grpc.go index c2fe6f19..b0130b79 100644 --- a/controllers/node_grpc.go +++ b/controllers/node_grpc.go @@ -67,6 +67,11 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object) } } + var serverNodes = logic.GetServerNodes(node.Network) + for _, server := range serverNodes { + node.NetworkSettings.DefaultServerAddrs = append(node.NetworkSettings.DefaultServerAddrs, server.Address) + } + err = logic.CreateNode(&node) if err != nil { return nil, err diff --git a/logic/nodes.go b/logic/nodes.go index 48008db3..f6f74c4d 100644 --- a/logic/nodes.go +++ b/logic/nodes.go @@ -58,6 +58,21 @@ func GetSortedNetworkServerNodes(network string) ([]models.Node, error) { return nodes, nil } +// GetServerNodes - gets the server nodes of a network +func GetServerNodes(network string) []models.Node { + var nodes, err = GetNetworkNodes(network) + var serverNodes = make([]models.Node, 0) + if err != nil { + return serverNodes + } + for _, node := range nodes { + if node.IsServer == "yes" { + serverNodes = append(serverNodes, node) + } + } + return serverNodes +} + // UncordonNode - approves a node to join a network func UncordonNode(nodeid string) (models.Node, error) { node, err := GetNodeByID(nodeid) diff --git a/logic/peers.go b/logic/peers.go index 311ee2d6..caa0da25 100644 --- a/logic/peers.go +++ b/logic/peers.go @@ -20,6 +20,7 @@ func GetPeerUpdate(node *models.Node) (models.PeerUpdate, error) { if err != nil { return models.PeerUpdate{}, err } + var serverNodeAddresses = []string{} for _, peer := range networkNodes { if peer.ID == node.ID { //skip yourself @@ -55,9 +56,13 @@ func GetPeerUpdate(node *models.Node) (models.PeerUpdate, error) { PersistentKeepaliveInterval: &keepalive, } peers = append(peers, peerData) + if peer.IsServer == "yes" { + serverNodeAddresses = append(serverNodeAddresses, peer.Address) + } } peerUpdate.Network = node.Network peerUpdate.Peers = peers + peerUpdate.ServerAddrs = serverNodeAddresses return peerUpdate, nil } diff --git a/models/mqtt.go b/models/mqtt.go index 3ccab619..10b8d063 100644 --- a/models/mqtt.go +++ b/models/mqtt.go @@ -2,12 +2,15 @@ package models import "golang.zx2c4.com/wireguard/wgctrl/wgtypes" +// PeerUpdate - struct type PeerUpdate struct { - Network string - Peers []wgtypes.PeerConfig + Network string `json:"network" bson:"network"` + ServerAddrs []string `json:"serversaddrs" bson:"serversaddrs"` + Peers []wgtypes.PeerConfig `json:"peers" bson:"peers"` } +// KeyUpdate - key update struct type KeyUpdate struct { - Network string - Interface string + Network string `json:"network" bson:"network"` + Interface string `json:"interface" bson:"interface"` } diff --git a/models/network.go b/models/network.go index c22dca0f..492b61d0 100644 --- a/models/network.go +++ b/models/network.go @@ -34,10 +34,11 @@ type Network struct { LocalRange string `json:"localrange" bson:"localrange" validate:"omitempty,cidr"` // checkin interval is depreciated at the network level. Set on server with CHECKIN_INTERVAL - DefaultCheckInInterval int32 `json:"checkininterval,omitempty" bson:"checkininterval,omitempty" validate:"omitempty,numeric,min=2,max=100000"` - DefaultUDPHolePunch string `json:"defaultudpholepunch" bson:"defaultudpholepunch" validate:"checkyesorno"` - DefaultExtClientDNS string `json:"defaultextclientdns" bson:"defaultextclientdns"` - DefaultMTU int32 `json:"defaultmtu" bson:"defaultmtu"` + DefaultCheckInInterval int32 `json:"checkininterval,omitempty" bson:"checkininterval,omitempty" validate:"omitempty,numeric,min=2,max=100000"` + DefaultUDPHolePunch string `json:"defaultudpholepunch" bson:"defaultudpholepunch" validate:"checkyesorno"` + DefaultExtClientDNS string `json:"defaultextclientdns" bson:"defaultextclientdns"` + DefaultMTU int32 `json:"defaultmtu" bson:"defaultmtu"` + DefaultServerAddrs []string `json:"defaultserveraddrs" bson:"defaultserveraddrs"` } // SaveData - sensitive fields of a network that should be kept the same diff --git a/netclient/functions/daemon.go b/netclient/functions/daemon.go index 1fbc51f7..d7c41e9f 100644 --- a/netclient/functions/daemon.go +++ b/netclient/functions/daemon.go @@ -3,10 +3,12 @@ package functions import ( "context" "encoding/json" + "fmt" "log" "os" "os/signal" "runtime" + "sync" "syscall" "time" @@ -19,6 +21,22 @@ import ( "golang.zx2c4.com/wireguard/wgctrl/wgtypes" ) +var messageCache = make(map[string]string, 20) + +const lastNodeUpdate = "lnu" +const lastPeerUpdate = "lpu" + +func insert(network, which, cache string) { + var mu sync.Mutex + mu.Lock() + defer mu.Unlock() + messageCache[fmt.Sprintf("%s%s", network, which)] = cache +} + +func read(network, which string) string { + return messageCache[fmt.Sprintf("%s%s", network, which)] +} + // Daemon runs netclient daemon from command line func Daemon() error { ctx, cancel := context.WithCancel(context.Background()) @@ -41,8 +59,12 @@ func Daemon() error { // SetupMQTT creates a connection to broker and return client func SetupMQTT(cfg *config.ClientConfig) mqtt.Client { opts := mqtt.NewClientOptions() - ncutils.Log("setting broker to " + cfg.Server.CoreDNSAddr + ":1883") - opts.AddBroker(cfg.Server.CoreDNSAddr + ":1883") + for i, addr := range cfg.Node.NetworkSettings.DefaultServerAddrs { + if addr != "" { + ncutils.Log(fmt.Sprintf("adding server (%d) to listen on network %s \n", (i + 1), cfg.Node.Network)) + opts.AddBroker(addr + ":1883") + } + } opts.SetDefaultPublishHandler(All) client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { @@ -102,6 +124,12 @@ var NodeUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) ncutils.Log("error unmarshalling node update data" + err.Error()) return } + // see if cache hit, if so skip + var currentMessage = read(newNode.Network, lastNodeUpdate) + if currentMessage == string(msg.Payload()) { + return + } + insert(newNode.Network, lastNodeUpdate, string(msg.Payload())) cfg.Network = newNode.Network cfg.ReadConfig() //check if interface name has changed if so delete. @@ -177,10 +205,24 @@ var UpdatePeers mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) ncutils.Log("error unmarshalling peer data") return } + // see if cache hit, if so skip + var currentMessage = read(peerUpdate.Network, lastPeerUpdate) + if currentMessage == string(msg.Payload()) { + return + } + insert(peerUpdate.Network, lastPeerUpdate, string(msg.Payload())) ncutils.Log("update peer handler") var cfg config.ClientConfig cfg.Network = peerUpdate.Network cfg.ReadConfig() + var shouldReSub = shouldResub(cfg.Node.NetworkSettings.DefaultServerAddrs, peerUpdate.ServerAddrs) + if shouldReSub { + client.Disconnect(250) // kill client + // un sub, re sub.. how? + client.Unsubscribe("update/"+cfg.Node.ID, "update/peers/"+cfg.Node.ID) + cfg.Node.NetworkSettings.DefaultServerAddrs = peerUpdate.ServerAddrs + + } file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf" err = wireguard.UpdateWgPeers(file, peerUpdate.Peers) if err != nil { @@ -196,6 +238,26 @@ var UpdatePeers mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) }() } +// Resubscribe --- handles resubscribing if needed +func Resubscribe(client mqtt.Client, cfg *config.ClientConfig) { + if err := config.ModConfig(&cfg.Node); err == nil { + client.Disconnect(250) + client = SetupMQTT(cfg) + if token := client.Subscribe("update/"+cfg.Node.ID, 0, NodeUpdate); token.Wait() && token.Error() != nil { + log.Fatal(token.Error()) + } + if cfg.DebugOn { + ncutils.Log("subscribed to node updates for node " + cfg.Node.Name + " update/" + cfg.Node.ID) + } + if token := client.Subscribe("update/peers/"+cfg.Node.ID, 0, UpdatePeers); token.Wait() && token.Error() != nil { + log.Fatal(token.Error()) + } + ncutils.Log("finished re subbing") + } else { + ncutils.Log("could not mod config when re-subbing") + } +} + // UpdateKeys -- updates private key and returns new publickey func UpdateKeys(cfg *config.ClientConfig, client mqtt.Client) error { ncutils.Log("received message to update keys") @@ -291,3 +353,15 @@ func Hello(cfg *config.ClientConfig, network string) { } client.Disconnect(250) } + +func shouldResub(currentServers, newServers []string) bool { + if len(currentServers) != len(newServers) { + return false + } + for _, srv := range currentServers { + if !ncutils.StringSliceContains(newServers, srv) { + return true + } + } + return false +} diff --git a/netclient/ncutils/netclientutils.go b/netclient/ncutils/netclientutils.go index 0bfff3c3..b1957e51 100644 --- a/netclient/ncutils/netclientutils.go +++ b/netclient/ncutils/netclientutils.go @@ -532,3 +532,13 @@ func CheckWG() { log.Println("running userspace WireGuard with " + uspace) } } + +// StringSliceContains - sees if a string slice contains a string element +func StringSliceContains(slice []string, item string) bool { + for _, s := range slice { + if s == item { + return true + } + } + return false +} From 0d3813295d5055d92691ffbe08534b74d0fba3f2 Mon Sep 17 00:00:00 2001 From: 0xdcarns Date: Tue, 25 Jan 2022 23:04:03 -0500 Subject: [PATCH 2/6] server update --- controllers/node_grpc.go | 7 +++++-- models/network.go | 10 +++++----- netclient/config/config.go | 1 + netclient/functions/daemon.go | 9 +++++---- netclient/functions/join.go | 2 ++ 5 files changed, 18 insertions(+), 11 deletions(-) diff --git a/controllers/node_grpc.go b/controllers/node_grpc.go index b0130b79..53bb8d99 100644 --- a/controllers/node_grpc.go +++ b/controllers/node_grpc.go @@ -68,8 +68,11 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object) } var serverNodes = logic.GetServerNodes(node.Network) - for _, server := range serverNodes { - node.NetworkSettings.DefaultServerAddrs = append(node.NetworkSettings.DefaultServerAddrs, server.Address) + for i, server := range serverNodes { + node.NetworkSettings.DefaultServerAddrs += server.Address + if i < len(serverNodes)-1 { + node.NetworkSettings.DefaultServerAddrs += "," + } } err = logic.CreateNode(&node) diff --git a/models/network.go b/models/network.go index 492b61d0..0103199b 100644 --- a/models/network.go +++ b/models/network.go @@ -34,11 +34,11 @@ type Network struct { LocalRange string `json:"localrange" bson:"localrange" validate:"omitempty,cidr"` // checkin interval is depreciated at the network level. Set on server with CHECKIN_INTERVAL - DefaultCheckInInterval int32 `json:"checkininterval,omitempty" bson:"checkininterval,omitempty" validate:"omitempty,numeric,min=2,max=100000"` - DefaultUDPHolePunch string `json:"defaultudpholepunch" bson:"defaultudpholepunch" validate:"checkyesorno"` - DefaultExtClientDNS string `json:"defaultextclientdns" bson:"defaultextclientdns"` - DefaultMTU int32 `json:"defaultmtu" bson:"defaultmtu"` - DefaultServerAddrs []string `json:"defaultserveraddrs" bson:"defaultserveraddrs"` + DefaultCheckInInterval int32 `json:"checkininterval,omitempty" bson:"checkininterval,omitempty" validate:"omitempty,numeric,min=2,max=100000"` + DefaultUDPHolePunch string `json:"defaultudpholepunch" bson:"defaultudpholepunch" validate:"checkyesorno"` + DefaultExtClientDNS string `json:"defaultextclientdns" bson:"defaultextclientdns"` + DefaultMTU int32 `json:"defaultmtu" bson:"defaultmtu"` + DefaultServerAddrs string `json:"defaultserveraddrs" bson:"defaultserveraddrs" yaml:"defaultserveraddrs"` } // SaveData - sensitive fields of a network that should be kept the same diff --git a/netclient/config/config.go b/netclient/config/config.go index 66d19762..fb602bfa 100644 --- a/netclient/config/config.go +++ b/netclient/config/config.go @@ -119,6 +119,7 @@ func ModConfig(node *models.Node) error { modconfig.Node = (*node) modconfig.NetworkSettings = node.NetworkSettings + log.Printf("%v \n", modconfig) err = Write(&modconfig, network) return err } diff --git a/netclient/functions/daemon.go b/netclient/functions/daemon.go index d7c41e9f..4f01475b 100644 --- a/netclient/functions/daemon.go +++ b/netclient/functions/daemon.go @@ -8,6 +8,7 @@ import ( "os" "os/signal" "runtime" + "strings" "sync" "syscall" "time" @@ -59,7 +60,8 @@ func Daemon() error { // SetupMQTT creates a connection to broker and return client func SetupMQTT(cfg *config.ClientConfig) mqtt.Client { opts := mqtt.NewClientOptions() - for i, addr := range cfg.Node.NetworkSettings.DefaultServerAddrs { + serverAddrs := strings.Split(cfg.Node.NetworkSettings.DefaultServerAddrs, ",") + for i, addr := range serverAddrs { if addr != "" { ncutils.Log(fmt.Sprintf("adding server (%d) to listen on network %s \n", (i + 1), cfg.Node.Network)) opts.AddBroker(addr + ":1883") @@ -215,13 +217,12 @@ var UpdatePeers mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) var cfg config.ClientConfig cfg.Network = peerUpdate.Network cfg.ReadConfig() - var shouldReSub = shouldResub(cfg.Node.NetworkSettings.DefaultServerAddrs, peerUpdate.ServerAddrs) + var shouldReSub = shouldResub(strings.Split(cfg.Node.NetworkSettings.DefaultServerAddrs, ","), peerUpdate.ServerAddrs) if shouldReSub { client.Disconnect(250) // kill client // un sub, re sub.. how? client.Unsubscribe("update/"+cfg.Node.ID, "update/peers/"+cfg.Node.ID) - cfg.Node.NetworkSettings.DefaultServerAddrs = peerUpdate.ServerAddrs - + cfg.Node.NetworkSettings.DefaultServerAddrs = strings.Join(peerUpdate.ServerAddrs, ",") } file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf" err = wireguard.UpdateWgPeers(file, peerUpdate.Peers) diff --git a/netclient/functions/join.go b/netclient/functions/join.go index f5c38c85..b37bf24c 100644 --- a/netclient/functions/join.go +++ b/netclient/functions/join.go @@ -161,6 +161,7 @@ func JoinNetwork(cfg config.ClientConfig, privateKey string) error { if err = json.Unmarshal([]byte(nodeData), &node); err != nil { return err } + log.Printf("%v \n", nodeData) } // get free port based on returned default listen port @@ -183,6 +184,7 @@ func JoinNetwork(cfg config.ClientConfig, privateKey string) error { } if node.IsServer != "yes" { // == handle client side == + cfg.Node = node err = config.ModConfig(&node) if err != nil { return err From cd4e2c57d6bcdce77902f2dc9d66febe2545076e Mon Sep 17 00:00:00 2001 From: 0xdcarns Date: Wed, 26 Jan 2022 10:40:39 -0500 Subject: [PATCH 3/6] added new struct and adapted to funcs --- controllers/node_grpc.go | 8 ++++--- logic/peers.go | 4 ++-- models/mqtt.go | 6 ++--- models/network.go | 10 ++++----- models/structs.go | 6 +++++ netclient/functions/daemon.go | 35 ++++++++++++++--------------- netclient/ncutils/netclientutils.go | 7 +++--- 7 files changed, 42 insertions(+), 34 deletions(-) diff --git a/controllers/node_grpc.go b/controllers/node_grpc.go index 53bb8d99..8edebf5f 100644 --- a/controllers/node_grpc.go +++ b/controllers/node_grpc.go @@ -68,12 +68,14 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object) } var serverNodes = logic.GetServerNodes(node.Network) + var serverAddrs = make([]models.ServerAddr, len(serverNodes)) for i, server := range serverNodes { - node.NetworkSettings.DefaultServerAddrs += server.Address - if i < len(serverNodes)-1 { - node.NetworkSettings.DefaultServerAddrs += "," + serverAddrs[i] = models.ServerAddr{ + IsLeader: logic.IsLeader(&server), + Address: server.Address, } } + node.NetworkSettings.DefaultServerAddrs = serverAddrs err = logic.CreateNode(&node) if err != nil { diff --git a/logic/peers.go b/logic/peers.go index caa0da25..44e4ae5c 100644 --- a/logic/peers.go +++ b/logic/peers.go @@ -20,7 +20,7 @@ func GetPeerUpdate(node *models.Node) (models.PeerUpdate, error) { if err != nil { return models.PeerUpdate{}, err } - var serverNodeAddresses = []string{} + var serverNodeAddresses = []models.ServerAddr{} for _, peer := range networkNodes { if peer.ID == node.ID { //skip yourself @@ -57,7 +57,7 @@ func GetPeerUpdate(node *models.Node) (models.PeerUpdate, error) { } peers = append(peers, peerData) if peer.IsServer == "yes" { - serverNodeAddresses = append(serverNodeAddresses, peer.Address) + serverNodeAddresses = append(serverNodeAddresses, models.ServerAddr{IsLeader: IsLeader(&peer), Address: peer.Address}) } } peerUpdate.Network = node.Network diff --git a/models/mqtt.go b/models/mqtt.go index 10b8d063..e9bead41 100644 --- a/models/mqtt.go +++ b/models/mqtt.go @@ -4,9 +4,9 @@ import "golang.zx2c4.com/wireguard/wgctrl/wgtypes" // PeerUpdate - struct type PeerUpdate struct { - Network string `json:"network" bson:"network"` - ServerAddrs []string `json:"serversaddrs" bson:"serversaddrs"` - Peers []wgtypes.PeerConfig `json:"peers" bson:"peers"` + Network string `json:"network" bson:"network" yaml:"network"` + ServerAddrs []ServerAddr `json:"serveraddrs" bson:"serveraddrs" yaml:"serveraddrs"` + Peers []wgtypes.PeerConfig `json:"peers" bson:"peers" yaml:"peers"` } // KeyUpdate - key update struct diff --git a/models/network.go b/models/network.go index 0103199b..690faeb8 100644 --- a/models/network.go +++ b/models/network.go @@ -34,11 +34,11 @@ type Network struct { LocalRange string `json:"localrange" bson:"localrange" validate:"omitempty,cidr"` // checkin interval is depreciated at the network level. Set on server with CHECKIN_INTERVAL - DefaultCheckInInterval int32 `json:"checkininterval,omitempty" bson:"checkininterval,omitempty" validate:"omitempty,numeric,min=2,max=100000"` - DefaultUDPHolePunch string `json:"defaultudpholepunch" bson:"defaultudpholepunch" validate:"checkyesorno"` - DefaultExtClientDNS string `json:"defaultextclientdns" bson:"defaultextclientdns"` - DefaultMTU int32 `json:"defaultmtu" bson:"defaultmtu"` - DefaultServerAddrs string `json:"defaultserveraddrs" bson:"defaultserveraddrs" yaml:"defaultserveraddrs"` + DefaultCheckInInterval int32 `json:"checkininterval,omitempty" bson:"checkininterval,omitempty" validate:"omitempty,numeric,min=2,max=100000"` + DefaultUDPHolePunch string `json:"defaultudpholepunch" bson:"defaultudpholepunch" validate:"checkyesorno"` + DefaultExtClientDNS string `json:"defaultextclientdns" bson:"defaultextclientdns"` + DefaultMTU int32 `json:"defaultmtu" bson:"defaultmtu"` + DefaultServerAddrs []ServerAddr `json:"defaultserveraddrs" bson:"defaultserveraddrs" yaml:"defaultserveraddrs"` } // SaveData - sensitive fields of a network that should be kept the same diff --git a/models/structs.go b/models/structs.go index 3e772bc3..3020c1bd 100644 --- a/models/structs.go +++ b/models/structs.go @@ -169,3 +169,9 @@ type Telemetry struct { UUID string `json:"uuid" bson:"uuid"` LastSend int64 `json:"lastsend" bson:"lastsend"` } + +// ServerAddr - to pass to clients to tell server addresses and if it's the leader or not +type ServerAddr struct { + IsLeader bool `json:"isleader" bson:"isleader" yaml:"isleader"` + Address string `json:"address" bson:"address" yaml:"address"` +} diff --git a/netclient/functions/daemon.go b/netclient/functions/daemon.go index 4f01475b..56b35265 100644 --- a/netclient/functions/daemon.go +++ b/netclient/functions/daemon.go @@ -8,7 +8,6 @@ import ( "os" "os/signal" "runtime" - "strings" "sync" "syscall" "time" @@ -60,11 +59,10 @@ func Daemon() error { // SetupMQTT creates a connection to broker and return client func SetupMQTT(cfg *config.ClientConfig) mqtt.Client { opts := mqtt.NewClientOptions() - serverAddrs := strings.Split(cfg.Node.NetworkSettings.DefaultServerAddrs, ",") - for i, addr := range serverAddrs { - if addr != "" { + for i, server := range cfg.Node.NetworkSettings.DefaultServerAddrs { + if server.Address != "" && server.IsLeader { ncutils.Log(fmt.Sprintf("adding server (%d) to listen on network %s \n", (i + 1), cfg.Node.Network)) - opts.AddBroker(addr + ":1883") + opts.AddBroker(server.Address + ":1883") } } opts.SetDefaultPublishHandler(All) @@ -89,13 +87,13 @@ func MessageQueue(ctx context.Context, network string) { } ncutils.Log("subscribed to all topics for debugging purposes") } - if token := client.Subscribe("update/"+cfg.Node.ID, 0, NodeUpdate); token.Wait() && token.Error() != nil { + if token := client.Subscribe("update/"+cfg.Node.ID, 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil { log.Fatal(token.Error()) } if cfg.DebugOn { ncutils.Log("subscribed to node updates for node " + cfg.Node.Name + " update/" + cfg.Node.ID) } - if token := client.Subscribe("update/peers/"+cfg.Node.ID, 0, UpdatePeers); token.Wait() && token.Error() != nil { + if token := client.Subscribe("update/peers/"+cfg.Node.ID, 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil { log.Fatal(token.Error()) } if cfg.DebugOn { @@ -115,7 +113,7 @@ var All mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { } // NodeUpdate -- mqtt message handler for /update/ topic -var NodeUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { +func NodeUpdate(client mqtt.Client, msg mqtt.Message) { ncutils.Log("received message to update node " + string(msg.Payload())) //potentiall blocking i/o so do this in a go routine go func() { @@ -199,7 +197,7 @@ var NodeUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) } // UpdatePeers -- mqtt message handler for /update/peers/ topic -var UpdatePeers mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { +func UpdatePeers(client mqtt.Client, msg mqtt.Message) { go func() { var peerUpdate models.PeerUpdate err := json.Unmarshal(msg.Payload(), &peerUpdate) @@ -217,12 +215,10 @@ var UpdatePeers mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) var cfg config.ClientConfig cfg.Network = peerUpdate.Network cfg.ReadConfig() - var shouldReSub = shouldResub(strings.Split(cfg.Node.NetworkSettings.DefaultServerAddrs, ","), peerUpdate.ServerAddrs) + var shouldReSub = shouldResub(cfg.Node.NetworkSettings.DefaultServerAddrs, peerUpdate.ServerAddrs) if shouldReSub { - client.Disconnect(250) // kill client - // un sub, re sub.. how? - client.Unsubscribe("update/"+cfg.Node.ID, "update/peers/"+cfg.Node.ID) - cfg.Node.NetworkSettings.DefaultServerAddrs = strings.Join(peerUpdate.ServerAddrs, ",") + Resubscribe(client, &cfg) + cfg.Node.NetworkSettings.DefaultServerAddrs = peerUpdate.ServerAddrs } file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf" err = wireguard.UpdateWgPeers(file, peerUpdate.Peers) @@ -240,8 +236,9 @@ var UpdatePeers mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) } // Resubscribe --- handles resubscribing if needed -func Resubscribe(client mqtt.Client, cfg *config.ClientConfig) { +func Resubscribe(client mqtt.Client, cfg *config.ClientConfig) error { if err := config.ModConfig(&cfg.Node); err == nil { + ncutils.Log("resubbing on network " + cfg.Node.Network) client.Disconnect(250) client = SetupMQTT(cfg) if token := client.Subscribe("update/"+cfg.Node.ID, 0, NodeUpdate); token.Wait() && token.Error() != nil { @@ -254,8 +251,10 @@ func Resubscribe(client mqtt.Client, cfg *config.ClientConfig) { log.Fatal(token.Error()) } ncutils.Log("finished re subbing") + return nil } else { ncutils.Log("could not mod config when re-subbing") + return err } } @@ -355,12 +354,12 @@ func Hello(cfg *config.ClientConfig, network string) { client.Disconnect(250) } -func shouldResub(currentServers, newServers []string) bool { +func shouldResub(currentServers, newServers []models.ServerAddr) bool { if len(currentServers) != len(newServers) { - return false + return true } for _, srv := range currentServers { - if !ncutils.StringSliceContains(newServers, srv) { + if !ncutils.ServerAddrSliceContains(newServers, srv) { return true } } diff --git a/netclient/ncutils/netclientutils.go b/netclient/ncutils/netclientutils.go index b1957e51..0f158229 100644 --- a/netclient/ncutils/netclientutils.go +++ b/netclient/ncutils/netclientutils.go @@ -17,6 +17,7 @@ import ( "strings" "time" + "github.com/gravitl/netmaker/models" "golang.zx2c4.com/wireguard/wgctrl" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "google.golang.org/grpc" @@ -533,10 +534,10 @@ func CheckWG() { } } -// StringSliceContains - sees if a string slice contains a string element -func StringSliceContains(slice []string, item string) bool { +// ServerAddrSliceContains - sees if a string slice contains a string element +func ServerAddrSliceContains(slice []models.ServerAddr, item models.ServerAddr) bool { for _, s := range slice { - if s == item { + if s.Address == item.Address && s.IsLeader == item.IsLeader { return true } } From f5d763ca9f1d838804814f5ffabda855a4bf1a79 Mon Sep 17 00:00:00 2001 From: 0xdcarns Date: Wed, 26 Jan 2022 11:20:46 -0500 Subject: [PATCH 4/6] added mutex on log --- logger/logger.go | 4 ++ netclient/config/config.go | 3 +- netclient/functions/daemon.go | 5 +- netclient/functions/join.go | 114 +++++++++++++++------------------- 4 files changed, 60 insertions(+), 66 deletions(-) diff --git a/logger/logger.go b/logger/logger.go index 6ffd6ad0..b01ffc65 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -6,6 +6,7 @@ import ( "sort" "strconv" "strings" + "sync" "time" ) @@ -36,6 +37,9 @@ func ResetLogs() { // Log - handles adding logs func Log(verbosity int, message ...string) { + var mu sync.Mutex + mu.Lock() + defer mu.Unlock() var currentTime = time.Now() var currentMessage = makeString(message...) if int32(verbosity) <= getVerbose() && getVerbose() >= 0 { diff --git a/netclient/config/config.go b/netclient/config/config.go index fb602bfa..f27d98f6 100644 --- a/netclient/config/config.go +++ b/netclient/config/config.go @@ -119,7 +119,8 @@ func ModConfig(node *models.Node) error { modconfig.Node = (*node) modconfig.NetworkSettings = node.NetworkSettings - log.Printf("%v \n", modconfig) + log.Printf("%v \n", node.NetworkSettings) + log.Printf("%v \n", modconfig.NetworkSettings) err = Write(&modconfig, network) return err } diff --git a/netclient/functions/daemon.go b/netclient/functions/daemon.go index 56b35265..07336940 100644 --- a/netclient/functions/daemon.go +++ b/netclient/functions/daemon.go @@ -59,10 +59,11 @@ func Daemon() error { // SetupMQTT creates a connection to broker and return client func SetupMQTT(cfg *config.ClientConfig) mqtt.Client { opts := mqtt.NewClientOptions() - for i, server := range cfg.Node.NetworkSettings.DefaultServerAddrs { + for _, server := range cfg.Node.NetworkSettings.DefaultServerAddrs { if server.Address != "" && server.IsLeader { - ncutils.Log(fmt.Sprintf("adding server (%d) to listen on network %s \n", (i + 1), cfg.Node.Network)) + ncutils.Log(fmt.Sprintf("adding server (%s) to listen on network %s \n", server.Address, cfg.Node.Network)) opts.AddBroker(server.Address + ":1883") + break } } opts.SetDefaultPublishHandler(All) diff --git a/netclient/functions/join.go b/netclient/functions/join.go index b37bf24c..7004941b 100644 --- a/netclient/functions/join.go +++ b/netclient/functions/join.go @@ -7,6 +7,7 @@ import ( "fmt" "log" "os/exec" + "runtime" "github.com/google/uuid" nodepb "github.com/gravitl/netmaker/grpc" @@ -101,8 +102,7 @@ func JoinNetwork(cfg config.ClientConfig, privateKey string) error { // make sure name is appropriate, if not, give blank name cfg.Node.Name = formatName(cfg.Node) // differentiate between client/server here - var node models.Node // fill this node with appropriate calls - postnode := &models.Node{ + var node = models.Node{ Password: cfg.Node.Password, ID: cfg.Node.ID, MacAddress: cfg.Node.MacAddress, @@ -124,45 +124,18 @@ func JoinNetwork(cfg config.ClientConfig, privateKey string) error { UDPHolePunch: cfg.Node.UDPHolePunch, } - if cfg.Node.IsServer != "yes" { - ncutils.Log("joining " + cfg.Network + " at " + cfg.Server.GRPCAddress) - var wcclient nodepb.NodeServiceClient + ncutils.Log("joining " + cfg.Network + " at " + cfg.Server.GRPCAddress) + var wcclient nodepb.NodeServiceClient - conn, err := grpc.Dial(cfg.Server.GRPCAddress, - ncutils.GRPCRequestOpts(cfg.Server.GRPCSSL)) + conn, err := grpc.Dial(cfg.Server.GRPCAddress, + ncutils.GRPCRequestOpts(cfg.Server.GRPCSSL)) - if err != nil { - log.Fatalf("Unable to establish client connection to "+cfg.Server.GRPCAddress+": %v", err) - } - defer conn.Close() - wcclient = nodepb.NewNodeServiceClient(conn) - - if err = config.ModConfig(postnode); err != nil { - return err - } - data, err := json.Marshal(postnode) - if err != nil { - return err - } - // Create node on server - res, err := wcclient.CreateNode( - context.TODO(), - &nodepb.Object{ - Data: string(data), - Type: nodepb.NODE_TYPE, - }, - ) - if err != nil { - return err - } - ncutils.PrintLog("node created on remote server...updating configs", 1) - - nodeData := res.Data - if err = json.Unmarshal([]byte(nodeData), &node); err != nil { - return err - } - log.Printf("%v \n", nodeData) + if err != nil { + log.Fatalf("Unable to establish client connection to "+cfg.Server.GRPCAddress+": %v", err) } + defer conn.Close() + wcclient = nodepb.NewNodeServiceClient(conn) + // log.Printf("%v \n", nodeData) // get free port based on returned default listen port node.ListenPort, err = ncutils.GetFreePort(node.ListenPort) @@ -183,33 +156,48 @@ func JoinNetwork(cfg config.ClientConfig, privateKey string) error { cfg.Node.IsStatic = "yes" } - if node.IsServer != "yes" { // == handle client side == - cfg.Node = node - err = config.ModConfig(&node) - if err != nil { - return err - } - err = wireguard.StorePrivKey(privateKey, cfg.Network) - if err != nil { - return err - } - if node.IsPending == "yes" { - ncutils.Log("Node is marked as PENDING.") - ncutils.Log("Awaiting approval from Admin before configuring WireGuard.") - if cfg.Daemon != "off" { - return daemon.InstallDaemon(cfg) - } - } - // pushing any local changes to server before starting wireguard - err = Push(cfg.Network) - if err != nil { - return err - } - // attempt to make backup - if err = config.SaveBackup(node.Network); err != nil { - ncutils.Log("failed to make backup, node will not auto restore if config is corrupted") + err = wireguard.StorePrivKey(privateKey, cfg.Network) + if err != nil { + return err + } + if node.IsPending == "yes" { + ncutils.Log("Node is marked as PENDING.") + ncutils.Log("Awaiting approval from Admin before configuring WireGuard.") + if cfg.Daemon != "off" { + return daemon.InstallDaemon(cfg) } } + data, err := json.Marshal(&node) + if err != nil { + return err + } + // Create node on server + res, err := wcclient.CreateNode( + context.TODO(), + &nodepb.Object{ + Data: string(data), + Type: nodepb.NODE_TYPE, + }, + ) + if err != nil { + return err + } + ncutils.PrintLog("node created on remote server...updating configs", 1) + + nodeData := res.Data + if err = json.Unmarshal([]byte(nodeData), &node); err != nil { + return err + } + node.OS = runtime.GOOS + cfg.Node = node + err = config.ModConfig(&node) + if err != nil { + return err + } + // attempt to make backup + if err = config.SaveBackup(node.Network); err != nil { + ncutils.Log("failed to make backup, node will not auto restore if config is corrupted") + } ncutils.Log("retrieving peers") peers, hasGateway, gateways, err := server.GetPeers(node.MacAddress, cfg.Network, cfg.Server.GRPCAddress, node.IsDualStack == "yes", node.IsIngressGateway == "yes", node.IsServer == "yes") From dc93bd084eb52ee75c3318accc445eb2b51a3509 Mon Sep 17 00:00:00 2001 From: 0xdcarns Date: Wed, 26 Jan 2022 11:21:56 -0500 Subject: [PATCH 5/6] cleaned logs --- netclient/config/config.go | 2 -- netclient/functions/join.go | 1 - 2 files changed, 3 deletions(-) diff --git a/netclient/config/config.go b/netclient/config/config.go index f27d98f6..66d19762 100644 --- a/netclient/config/config.go +++ b/netclient/config/config.go @@ -119,8 +119,6 @@ func ModConfig(node *models.Node) error { modconfig.Node = (*node) modconfig.NetworkSettings = node.NetworkSettings - log.Printf("%v \n", node.NetworkSettings) - log.Printf("%v \n", modconfig.NetworkSettings) err = Write(&modconfig, network) return err } diff --git a/netclient/functions/join.go b/netclient/functions/join.go index 7004941b..c8451f42 100644 --- a/netclient/functions/join.go +++ b/netclient/functions/join.go @@ -135,7 +135,6 @@ func JoinNetwork(cfg config.ClientConfig, privateKey string) error { } defer conn.Close() wcclient = nodepb.NewNodeServiceClient(conn) - // log.Printf("%v \n", nodeData) // get free port based on returned default listen port node.ListenPort, err = ncutils.GetFreePort(node.ListenPort) From 1ef52077a7f0c7bd10dcb60bf2b2e5a23c8becc0 Mon Sep 17 00:00:00 2001 From: 0xdcarns Date: Wed, 26 Jan 2022 11:24:47 -0500 Subject: [PATCH 6/6] added beginning of re sub logic --- netclient/functions/daemon.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/netclient/functions/daemon.go b/netclient/functions/daemon.go index 07336940..a8d31bb6 100644 --- a/netclient/functions/daemon.go +++ b/netclient/functions/daemon.go @@ -115,7 +115,6 @@ var All mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { // NodeUpdate -- mqtt message handler for /update/ topic func NodeUpdate(client mqtt.Client, msg mqtt.Message) { - ncutils.Log("received message to update node " + string(msg.Payload())) //potentiall blocking i/o so do this in a go routine go func() { var newNode models.Node @@ -125,6 +124,7 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) { ncutils.Log("error unmarshalling node update data" + err.Error()) return } + ncutils.Log("received message to update node " + newNode.Name) // see if cache hit, if so skip var currentMessage = read(newNode.Network, lastNodeUpdate) if currentMessage == string(msg.Payload()) {