From 239b9d36fc55b721bd9466bc2080e4e31e2301fe Mon Sep 17 00:00:00 2001 From: afeiszli Date: Wed, 2 Feb 2022 11:56:39 -0500 Subject: [PATCH] change queue identifier from server.ID to Network.NetID --- controllers/node_grpc.go | 7 +-- controllers/server_util.go | 6 +- logic/nodes.go | 104 ++++++++-------------------------- logic/peers.go | 2 +- models/structs.go | 1 - mq/mq.go | 23 ++++---- netclient/functions/daemon.go | 51 ++++------------- serverctl/serverctl.go | 8 +-- 8 files changed, 52 insertions(+), 150 deletions(-) diff --git a/controllers/node_grpc.go b/controllers/node_grpc.go index b9863d9e..bdcc70af 100644 --- a/controllers/node_grpc.go +++ b/controllers/node_grpc.go @@ -107,14 +107,10 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object) go func(node *models.Node) { if node.UDPHolePunch == "yes" { - var currentServerNodeID, getErr = logic.GetNetworkServerNodeID(node.Network) + var currentServerNode, getErr = logic.GetNetworkServerLeader(node.Network) if getErr != nil { return } - var currentServerNode, currErr = logic.GetNodeByID(currentServerNodeID) - if currErr != nil { - return - } for i := 0; i < 5; i++ { if logic.HasPeerConnected(node) { if logic.ShouldPublishPeerPorts(¤tServerNode) { @@ -190,7 +186,6 @@ func getServerAddrs(node *models.Node) { serverAddrs = append(serverAddrs, models.ServerAddr{ IsLeader: logic.IsLeader(&node), Address: node.Address, - ID: node.ID, }) } diff --git a/controllers/server_util.go b/controllers/server_util.go index d21795a8..d9ae9e58 100644 --- a/controllers/server_util.go +++ b/controllers/server_util.go @@ -23,14 +23,10 @@ func runServerPeerUpdate(node *models.Node, ifaceDelta bool) error { if servercfg.IsClientMode() != "on" { return nil } - var currentServerNodeID, getErr = logic.GetNetworkServerNodeID(node.Network) + var currentServerNode, getErr = logic.GetNetworkServerLeader(node.Network) if err != nil { return getErr } - var currentServerNode, currErr = logic.GetNodeByID(currentServerNodeID) - if currErr != nil { - return currErr - } if err = logic.ServerUpdate(¤tServerNode, ifaceDelta); err != nil { logger.Log(1, "server node:", currentServerNode.ID, "failed update") return err diff --git a/logic/nodes.go b/logic/nodes.go index fa1544ef..c0c1bb40 100644 --- a/logic/nodes.go +++ b/logic/nodes.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "sort" - "strings" "time" "github.com/go-playground/validator/v10" @@ -237,7 +236,6 @@ func CreateNode(node *models.Node) error { return fmt.Errorf("invalid address: ipv6 " + node.Address6 + " is not unique") } - // TODO: This covers legacy nodes, eventually want to remove legacy check node.ID = uuid.NewString() //Create a JWT for the node @@ -269,67 +267,6 @@ func CreateNode(node *models.Node) error { return err } -// IfaceDelta - is there interface changes -// func IfaceDelta(currentNode *models.Node, newNode *models.Node) bool { -// SetNodeDefaults(newNode) -// // single comparison statements -// if currentNode.IsServer != "yes" { -// return false -// } - -// if newNode.Endpoint != currentNode.Endpoint || -// newNode.LocalAddress != currentNode.LocalAddress || -// newNode.PublicKey != currentNode.PublicKey || -// newNode.Address != currentNode.Address || -// newNode.IsEgressGateway != currentNode.IsEgressGateway || -// newNode.IsIngressGateway != currentNode.IsIngressGateway || -// newNode.IsRelay != currentNode.IsRelay || -// newNode.UDPHolePunch != currentNode.UDPHolePunch || -// newNode.IsPending != currentNode.IsPending || -// newNode.PersistentKeepalive != currentNode.PersistentKeepalive || -// len(newNode.ExcludedAddrs) != len(currentNode.ExcludedAddrs) || -// len(newNode.AllowedIPs) != len(currentNode.AllowedIPs) { -// return true -// } - -// // multi-comparison statements -// if newNode.IsDualStack == "yes" { -// if newNode.Address6 != currentNode.Address6 { -// return true -// } -// } - -// if newNode.IsEgressGateway == "yes" { -// if len(currentNode.EgressGatewayRanges) != len(newNode.EgressGatewayRanges) { -// return true -// } -// for _, address := range newNode.EgressGatewayRanges { -// if !StringSliceContains(currentNode.EgressGatewayRanges, address) { -// return true -// } -// } -// } - -// if newNode.IsRelay == "yes" { -// if len(currentNode.RelayAddrs) != len(newNode.RelayAddrs) { -// return true -// } -// for _, address := range newNode.RelayAddrs { -// if !StringSliceContains(currentNode.RelayAddrs, address) { -// return true -// } -// } -// } - -// for _, address := range newNode.AllowedIPs { -// if !StringSliceContains(currentNode.AllowedIPs, address) { -// return true -// } -// } - -// return false -// } - // GetAllNodes - returns all nodes in the DB func GetAllNodes() ([]models.Node, error) { var nodes []models.Node @@ -602,28 +539,35 @@ func GetDeletedNodeByID(uuid string) (models.Node, error) { } // GetNetworkServerNodeID - get network server node ID if exists -func GetNetworkServerNodeID(network string) (string, error) { - var nodes, err = GetNetworkNodes(network) +func GetNetworkServerLeader(network string) (models.Node, error) { + nodes, err := GetSortedNetworkServerNodes(network) if err != nil { - return "", err + return models.Node{}, err } for _, node := range nodes { - if node.IsServer == "yes" { - if servercfg.GetNodeID() != "" { - if servercfg.GetNodeID() == node.MacAddress { - if strings.Contains(node.ID, "###") { - DeleteNodeByMacAddress(&node, true) - logger.Log(1, "deleted legacy server node on network "+node.Network) - return "", errors.New("deleted legacy server node on network " + node.Network) - } - return node.ID, nil - } - continue - } - return node.ID, nil + if IsLeader(&node) { + return node, nil } } - return "", errors.New("could not find server node") + return models.Node{}, errors.New("could not find server leader") +} + +// GetNetworkServerNodeID - get network server node ID if exists +func GetNetworkServerLocal(network string) (models.Node, error) { + nodes, err := GetSortedNetworkServerNodes(network) + if err != nil { + return models.Node{}, err + } + mac := servercfg.GetNodeID() + if mac == "" { + return models.Node{}, fmt.Errorf("error retrieving local server node: server node ID is unset") + } + for _, node := range nodes { + if mac == node.MacAddress { + return node, nil + } + } + return models.Node{}, errors.New("could not find node for local server") } // validateServer - make sure servers dont change port or address diff --git a/logic/peers.go b/logic/peers.go index 1954c57d..fa494262 100644 --- a/logic/peers.go +++ b/logic/peers.go @@ -73,7 +73,7 @@ func GetPeerUpdate(node *models.Node) (models.PeerUpdate, error) { } peers = append(peers, peerData) if peer.IsServer == "yes" { - serverNodeAddresses = append(serverNodeAddresses, models.ServerAddr{ID: peer.ID, IsLeader: IsLeader(&peer), Address: peer.Address}) + serverNodeAddresses = append(serverNodeAddresses, models.ServerAddr{IsLeader: IsLeader(&peer), Address: peer.Address}) } } if node.IsIngressGateway == "yes" { diff --git a/models/structs.go b/models/structs.go index 43d7e50c..273408ab 100644 --- a/models/structs.go +++ b/models/structs.go @@ -177,7 +177,6 @@ type Telemetry struct { // ServerAddr - to pass to clients to tell server addresses and if it's the leader or not type ServerAddr struct { - ID string `json:"id" bson:"id" yaml:"id"` IsLeader bool `json:"isleader" bson:"isleader" yaml:"isleader"` Address string `json:"address" bson:"address" yaml:"address"` } diff --git a/mq/mq.go b/mq/mq.go index 9bddeeb0..e59466ea 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -195,13 +195,7 @@ func Keepalive(ctx context.Context) { logger.Log(1, "error retrieving networks for keepalive", err.Error()) } for _, network := range networks { - var id string - for _, servAddr := range network.DefaultServerAddrs { - if servAddr.IsLeader { - id = servAddr.ID - } - } - serverNode, errN := logic.GetNodeByID(id) + serverNode, errN := logic.GetNetworkServerLeader(network.NetID) if errN == nil { serverNode.SetLastCheckIn() logic.UpdateNode(&serverNode, &serverNode) @@ -210,14 +204,19 @@ func Keepalive(ctx context.Context) { } err = PublishPeerUpdate(&serverNode) if err != nil { - logger.Log(1, "error publishing udp port updates", err.Error()) + logger.Log(1, "error publishing udp port updates for network", network.NetID) + logger.Log(1, errN.Error()) } - } - if id == "" { - logger.Log(0, "leader not defined for network", network.NetID) + } else { + logger.Log(1, "unable to retrieve leader for network ", network.NetID) + logger.Log(1, errN.Error()) continue } - if token := client.Publish("serverkeepalive/"+id, 0, false, servercfg.GetVersion()); token.Wait() && token.Error() != nil { + if serverNode.Address == "" { + logger.Log(1, "leader not defined for network ", network.NetID) + continue + } + if token := client.Publish("serverkeepalive/"+network.NetID, 0, false, servercfg.GetVersion()); token.Wait() && token.Error() != nil { logger.Log(1, "error publishing server keepalive for network", network.NetID, token.Error().Error()) } else { logger.Log(2, "keepalive sent for network", network.NetID) diff --git a/netclient/functions/daemon.go b/netclient/functions/daemon.go index 6323f7a3..89b0e62a 100644 --- a/netclient/functions/daemon.go +++ b/netclient/functions/daemon.go @@ -3,7 +3,6 @@ package functions import ( "context" "encoding/json" - "errors" "fmt" "os" "os/signal" @@ -161,25 +160,24 @@ func MessageQueue(ctx context.Context, network string) { if cfg.DebugOn { ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID)) } - var id string var found bool for _, server := range cfg.NetworkSettings.DefaultServerAddrs { - if server.IsLeader { - id = server.ID + if !server.IsLeader { + continue } if server.Address != "" { - if token := client.Subscribe("serverkeepalive/"+id, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil { + if token := client.Subscribe("serverkeepalive/"+cfg.Node.Network, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil { ncutils.Log(token.Error().Error()) return } found = true if cfg.DebugOn { - ncutils.Log("subscribed to server keepalives for server " + id) + ncutils.Log("subscribed to server keepalives for server " + cfg.Node.Network) } } } if !found { - ncutils.Log("leader not defined for network " + cfg.Network) + ncutils.Log("leader not defined for network " + cfg.Node.Network) } defer client.Disconnect(250) go MonitorKeepalive(ctx, client, &cfg) @@ -342,19 +340,13 @@ func UpdatePeers(client mqtt.Client, msg mqtt.Message) { // MonitorKeepalive - checks time last server keepalive received. If more than 3+ minutes, notify and resubscribe func MonitorKeepalive(ctx context.Context, client mqtt.Client, cfg *config.ClientConfig) { - var id string - for _, servAddr := range cfg.NetworkSettings.DefaultServerAddrs { - if servAddr.IsLeader { - id = servAddr.ID - } - } for { select { case <-ctx.Done(): return case <-time.After(time.Second * 150): var keepalivetime time.Time - keepaliveval, ok := keepalive.Load(id) + keepaliveval, ok := keepalive.Load(cfg.Node.Network) if ok { keepalivetime = keepaliveval.(time.Time) } else { @@ -374,15 +366,7 @@ func MonitorKeepalive(ctx context.Context, client mqtt.Client, cfg *config.Clien // ServerKeepAlive -- handler to react to keepalive messages published by server func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) { - // var mu sync.Mutex - // mu.Lock() - // defer mu.Unlock() - serverid, err := getID(msg.Topic()) - if err != nil { - ncutils.Log("invalid ID in serverkeepalive topic") - } - keepalive.Store(serverid, time.Now()) - // ncutils.Log("keepalive from server") + keepalive.Store(parseNetworkFromTopic(msg.Topic()), time.Now()) } // Resubscribe --- handles resubscribing if needed @@ -402,25 +386,24 @@ func Resubscribe(client mqtt.Client, cfg *config.ClientConfig) error { ncutils.Log("error resubscribing to peers for " + cfg.Node.Network) return token.Error() } - var id string var found bool for _, server := range cfg.NetworkSettings.DefaultServerAddrs { - if server.IsLeader { - id = server.ID + if !server.IsLeader { + continue } if server.Address != "" { - if token := client.Subscribe("serverkeepalive/"+id, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil { + if token := client.Subscribe("serverkeepalive/"+cfg.Node.Network, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil { ncutils.Log("error resubscribing to serverkeepalive for " + cfg.Node.Network) return token.Error() } found = true if cfg.DebugOn { - ncutils.Log("subscribed to server keepalives for server " + id) + ncutils.Log("subscribed to server keepalives for server " + cfg.Node.Network) } } } if !found { - ncutils.Log("leader not defined for network " + cfg.Network) + ncutils.Log("leader not defined for network " + cfg.Node.Network) } ncutils.Log("finished re subbing") return nil @@ -583,13 +566,3 @@ func shouldResub(currentServers, newServers []models.ServerAddr) bool { } return false } - -func getID(topic string) (string, error) { - parts := strings.Split(topic, "/") - count := len(parts) - if count == 1 { - return "", errors.New("invalid topic") - } - //the last part of the topic will be the network.ID - return parts[count-1], nil -} diff --git a/serverctl/serverctl.go b/serverctl/serverctl.go index ca394155..e1c4f7c4 100644 --- a/serverctl/serverctl.go +++ b/serverctl/serverctl.go @@ -44,12 +44,8 @@ func SyncServerNetwork(network string) error { } } - serverNodeID, err := logic.GetNetworkServerNodeID(network) - if !ifaceExists && (err == nil && serverNodeID != "") { - serverNode, err := logic.GetNodeByID(serverNodeID) - if err != nil { - return err - } + serverNode, err := logic.GetNetworkServerLocal(network) + if !ifaceExists && (err == nil && serverNode.ID != "") { return logic.ServerUpdate(&serverNode, true) } else if !ifaceExists { _, err := logic.ServerJoin(&serverNetworkSettings)