diff --git a/controllers/node_grpc.go b/controllers/node_grpc.go index 53bb8d99..8edebf5f 100644 --- a/controllers/node_grpc.go +++ b/controllers/node_grpc.go @@ -68,12 +68,14 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object) } var serverNodes = logic.GetServerNodes(node.Network) + var serverAddrs = make([]models.ServerAddr, len(serverNodes)) for i, server := range serverNodes { - node.NetworkSettings.DefaultServerAddrs += server.Address - if i < len(serverNodes)-1 { - node.NetworkSettings.DefaultServerAddrs += "," + serverAddrs[i] = models.ServerAddr{ + IsLeader: logic.IsLeader(&server), + Address: server.Address, } } + node.NetworkSettings.DefaultServerAddrs = serverAddrs err = logic.CreateNode(&node) if err != nil { diff --git a/logic/peers.go b/logic/peers.go index caa0da25..44e4ae5c 100644 --- a/logic/peers.go +++ b/logic/peers.go @@ -20,7 +20,7 @@ func GetPeerUpdate(node *models.Node) (models.PeerUpdate, error) { if err != nil { return models.PeerUpdate{}, err } - var serverNodeAddresses = []string{} + var serverNodeAddresses = []models.ServerAddr{} for _, peer := range networkNodes { if peer.ID == node.ID { //skip yourself @@ -57,7 +57,7 @@ func GetPeerUpdate(node *models.Node) (models.PeerUpdate, error) { } peers = append(peers, peerData) if peer.IsServer == "yes" { - serverNodeAddresses = append(serverNodeAddresses, peer.Address) + serverNodeAddresses = append(serverNodeAddresses, models.ServerAddr{IsLeader: IsLeader(&peer), Address: peer.Address}) } } peerUpdate.Network = node.Network diff --git a/models/mqtt.go b/models/mqtt.go index 10b8d063..e9bead41 100644 --- a/models/mqtt.go +++ b/models/mqtt.go @@ -4,9 +4,9 @@ import "golang.zx2c4.com/wireguard/wgctrl/wgtypes" // PeerUpdate - struct type PeerUpdate struct { - Network string `json:"network" bson:"network"` - ServerAddrs []string `json:"serversaddrs" bson:"serversaddrs"` - Peers []wgtypes.PeerConfig `json:"peers" bson:"peers"` + Network string `json:"network" bson:"network" yaml:"network"` + ServerAddrs []ServerAddr `json:"serveraddrs" bson:"serveraddrs" yaml:"serveraddrs"` + Peers []wgtypes.PeerConfig `json:"peers" bson:"peers" yaml:"peers"` } // KeyUpdate - key update struct diff --git a/models/network.go b/models/network.go index 0103199b..690faeb8 100644 --- a/models/network.go +++ b/models/network.go @@ -34,11 +34,11 @@ type Network struct { LocalRange string `json:"localrange" bson:"localrange" validate:"omitempty,cidr"` // checkin interval is depreciated at the network level. Set on server with CHECKIN_INTERVAL - DefaultCheckInInterval int32 `json:"checkininterval,omitempty" bson:"checkininterval,omitempty" validate:"omitempty,numeric,min=2,max=100000"` - DefaultUDPHolePunch string `json:"defaultudpholepunch" bson:"defaultudpholepunch" validate:"checkyesorno"` - DefaultExtClientDNS string `json:"defaultextclientdns" bson:"defaultextclientdns"` - DefaultMTU int32 `json:"defaultmtu" bson:"defaultmtu"` - DefaultServerAddrs string `json:"defaultserveraddrs" bson:"defaultserveraddrs" yaml:"defaultserveraddrs"` + DefaultCheckInInterval int32 `json:"checkininterval,omitempty" bson:"checkininterval,omitempty" validate:"omitempty,numeric,min=2,max=100000"` + DefaultUDPHolePunch string `json:"defaultudpholepunch" bson:"defaultudpholepunch" validate:"checkyesorno"` + DefaultExtClientDNS string `json:"defaultextclientdns" bson:"defaultextclientdns"` + DefaultMTU int32 `json:"defaultmtu" bson:"defaultmtu"` + DefaultServerAddrs []ServerAddr `json:"defaultserveraddrs" bson:"defaultserveraddrs" yaml:"defaultserveraddrs"` } // SaveData - sensitive fields of a network that should be kept the same diff --git a/models/structs.go b/models/structs.go index 3e772bc3..3020c1bd 100644 --- a/models/structs.go +++ b/models/structs.go @@ -169,3 +169,9 @@ type Telemetry struct { UUID string `json:"uuid" bson:"uuid"` LastSend int64 `json:"lastsend" bson:"lastsend"` } + +// ServerAddr - to pass to clients to tell server addresses and if it's the leader or not +type ServerAddr struct { + IsLeader bool `json:"isleader" bson:"isleader" yaml:"isleader"` + Address string `json:"address" bson:"address" yaml:"address"` +} diff --git a/netclient/functions/daemon.go b/netclient/functions/daemon.go index 4f01475b..56b35265 100644 --- a/netclient/functions/daemon.go +++ b/netclient/functions/daemon.go @@ -8,7 +8,6 @@ import ( "os" "os/signal" "runtime" - "strings" "sync" "syscall" "time" @@ -60,11 +59,10 @@ func Daemon() error { // SetupMQTT creates a connection to broker and return client func SetupMQTT(cfg *config.ClientConfig) mqtt.Client { opts := mqtt.NewClientOptions() - serverAddrs := strings.Split(cfg.Node.NetworkSettings.DefaultServerAddrs, ",") - for i, addr := range serverAddrs { - if addr != "" { + for i, server := range cfg.Node.NetworkSettings.DefaultServerAddrs { + if server.Address != "" && server.IsLeader { ncutils.Log(fmt.Sprintf("adding server (%d) to listen on network %s \n", (i + 1), cfg.Node.Network)) - opts.AddBroker(addr + ":1883") + opts.AddBroker(server.Address + ":1883") } } opts.SetDefaultPublishHandler(All) @@ -89,13 +87,13 @@ func MessageQueue(ctx context.Context, network string) { } ncutils.Log("subscribed to all topics for debugging purposes") } - if token := client.Subscribe("update/"+cfg.Node.ID, 0, NodeUpdate); token.Wait() && token.Error() != nil { + if token := client.Subscribe("update/"+cfg.Node.ID, 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil { log.Fatal(token.Error()) } if cfg.DebugOn { ncutils.Log("subscribed to node updates for node " + cfg.Node.Name + " update/" + cfg.Node.ID) } - if token := client.Subscribe("update/peers/"+cfg.Node.ID, 0, UpdatePeers); token.Wait() && token.Error() != nil { + if token := client.Subscribe("update/peers/"+cfg.Node.ID, 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil { log.Fatal(token.Error()) } if cfg.DebugOn { @@ -115,7 +113,7 @@ var All mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { } // NodeUpdate -- mqtt message handler for /update/ topic -var NodeUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { +func NodeUpdate(client mqtt.Client, msg mqtt.Message) { ncutils.Log("received message to update node " + string(msg.Payload())) //potentiall blocking i/o so do this in a go routine go func() { @@ -199,7 +197,7 @@ var NodeUpdate mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) } // UpdatePeers -- mqtt message handler for /update/peers/ topic -var UpdatePeers mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { +func UpdatePeers(client mqtt.Client, msg mqtt.Message) { go func() { var peerUpdate models.PeerUpdate err := json.Unmarshal(msg.Payload(), &peerUpdate) @@ -217,12 +215,10 @@ var UpdatePeers mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) var cfg config.ClientConfig cfg.Network = peerUpdate.Network cfg.ReadConfig() - var shouldReSub = shouldResub(strings.Split(cfg.Node.NetworkSettings.DefaultServerAddrs, ","), peerUpdate.ServerAddrs) + var shouldReSub = shouldResub(cfg.Node.NetworkSettings.DefaultServerAddrs, peerUpdate.ServerAddrs) if shouldReSub { - client.Disconnect(250) // kill client - // un sub, re sub.. how? - client.Unsubscribe("update/"+cfg.Node.ID, "update/peers/"+cfg.Node.ID) - cfg.Node.NetworkSettings.DefaultServerAddrs = strings.Join(peerUpdate.ServerAddrs, ",") + Resubscribe(client, &cfg) + cfg.Node.NetworkSettings.DefaultServerAddrs = peerUpdate.ServerAddrs } file := ncutils.GetNetclientPathSpecific() + cfg.Node.Interface + ".conf" err = wireguard.UpdateWgPeers(file, peerUpdate.Peers) @@ -240,8 +236,9 @@ var UpdatePeers mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) } // Resubscribe --- handles resubscribing if needed -func Resubscribe(client mqtt.Client, cfg *config.ClientConfig) { +func Resubscribe(client mqtt.Client, cfg *config.ClientConfig) error { if err := config.ModConfig(&cfg.Node); err == nil { + ncutils.Log("resubbing on network " + cfg.Node.Network) client.Disconnect(250) client = SetupMQTT(cfg) if token := client.Subscribe("update/"+cfg.Node.ID, 0, NodeUpdate); token.Wait() && token.Error() != nil { @@ -254,8 +251,10 @@ func Resubscribe(client mqtt.Client, cfg *config.ClientConfig) { log.Fatal(token.Error()) } ncutils.Log("finished re subbing") + return nil } else { ncutils.Log("could not mod config when re-subbing") + return err } } @@ -355,12 +354,12 @@ func Hello(cfg *config.ClientConfig, network string) { client.Disconnect(250) } -func shouldResub(currentServers, newServers []string) bool { +func shouldResub(currentServers, newServers []models.ServerAddr) bool { if len(currentServers) != len(newServers) { - return false + return true } for _, srv := range currentServers { - if !ncutils.StringSliceContains(newServers, srv) { + if !ncutils.ServerAddrSliceContains(newServers, srv) { return true } } diff --git a/netclient/ncutils/netclientutils.go b/netclient/ncutils/netclientutils.go index b1957e51..0f158229 100644 --- a/netclient/ncutils/netclientutils.go +++ b/netclient/ncutils/netclientutils.go @@ -17,6 +17,7 @@ import ( "strings" "time" + "github.com/gravitl/netmaker/models" "golang.zx2c4.com/wireguard/wgctrl" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "google.golang.org/grpc" @@ -533,10 +534,10 @@ func CheckWG() { } } -// StringSliceContains - sees if a string slice contains a string element -func StringSliceContains(slice []string, item string) bool { +// ServerAddrSliceContains - sees if a string slice contains a string element +func ServerAddrSliceContains(slice []models.ServerAddr, item models.ServerAddr) bool { for _, s := range slice { - if s == item { + if s.Address == item.Address && s.IsLeader == item.IsLeader { return true } }