From 80fda83c0c62fc960c5b5b2d49c5cd47ac457291 Mon Sep 17 00:00:00 2001 From: 0xdcarns Date: Thu, 23 Feb 2023 10:55:42 -0500 Subject: [PATCH 1/4] adjusted peer updates to remove peers properly, slowed zombie check --- controllers/ext_client.go | 9 +- logic/peers.go | 179 ++------------------------------------ logic/zombie.go | 6 +- mq/handlers.go | 2 +- mq/publishers.go | 15 +--- 5 files changed, 17 insertions(+), 194 deletions(-) diff --git a/controllers/ext_client.go b/controllers/ext_client.go index 77256070..96ab1e7c 100644 --- a/controllers/ext_client.go +++ b/controllers/ext_client.go @@ -389,7 +389,7 @@ func createExtClient(w http.ResponseWriter, r *http.Request) { logger.Log(0, r.Header.Get("user"), "created new ext client on network", networkName) w.WriteHeader(http.StatusOK) go func() { - err = mq.PublishExtPeerUpdate(&node) + err = mq.PublishPeerUpdate() if err != nil { logger.Log(1, "error setting ext peers on "+nodeid+": "+err.Error()) } @@ -488,7 +488,7 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) { logger.Log(0, r.Header.Get("user"), "updated ext client", newExtClient.ClientID) if changedEnabled { // need to send a peer update to the ingress node as enablement of one of it's clients has changed if ingressNode, err := logic.GetNodeByID(newclient.IngressGatewayID); err == nil { - if err = mq.PublishExtPeerUpdate(&ingressNode); err != nil { + if err = mq.PublishPeerUpdate(); err != nil { logger.Log(1, "error setting ext peers on", ingressNode.ID.String(), ":", err.Error()) } } @@ -567,11 +567,10 @@ func deleteExtClient(w http.ResponseWriter, r *http.Request) { } go func() { - err = mq.PublishExtPeerUpdate(&ingressnode) - if err != nil { + if err := mq.PublishPeerUpdate(); err != nil { logger.Log(1, "error setting ext peers on "+ingressnode.ID.String()+": "+err.Error()) } - if err := mq.PublishDeleteExtClientDNS(&extclient); err != nil { + if err = mq.PublishDeleteExtClientDNS(&extclient); err != nil { logger.Log(1, "error publishing dns update for extclient deletion", err.Error()) } }() diff --git a/logic/peers.go b/logic/peers.go index 699308e2..15817c3c 100644 --- a/logic/peers.go +++ b/logic/peers.go @@ -6,7 +6,6 @@ import ( "log" "net" "net/netip" - "time" "github.com/gravitl/netmaker/database" "github.com/gravitl/netmaker/logger" @@ -17,177 +16,6 @@ import ( "golang.zx2c4.com/wireguard/wgctrl/wgtypes" ) -// GetPeersforProxy calculates the peers for a proxy -// TODO ========================== -// TODO ========================== -// TODO ========================== -// TODO ========================== -// TODO ========================== -// revisit this logic with new host/node models. -func GetPeersForProxy(node *models.Node, onlyPeers bool) (models.ProxyManagerPayload, error) { - proxyPayload := models.ProxyManagerPayload{} - var peers []wgtypes.PeerConfig - peerConfMap := make(map[string]models.PeerConf) - var err error - currentPeers, err := GetNetworkNodes(node.Network) - if err != nil { - return proxyPayload, err - } - if !onlyPeers { - if node.IsRelayed { - relayNode := FindRelay(node) - relayHost, err := GetHost(relayNode.HostID.String()) - if err != nil { - return proxyPayload, err - } - if relayNode != nil { - host, err := GetHost(relayNode.HostID.String()) - if err != nil { - logger.Log(0, "error retrieving host for relay node", relayNode.HostID.String(), err.Error()) - } - relayEndpoint, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", relayHost.EndpointIP, host.ListenPort)) - if err != nil { - logger.Log(1, "failed to resolve relay node endpoint: ", err.Error()) - } - proxyPayload.IsRelayed = true - proxyPayload.RelayedTo = relayEndpoint - } else { - logger.Log(0, "couldn't find relay node for: ", node.ID.String()) - } - - } - if node.IsRelay { - host, err := GetHost(node.HostID.String()) - if err != nil { - logger.Log(0, "error retrieving host for relay node", node.ID.String(), err.Error()) - } - relayedNodes, err := GetRelayedNodes(node) - if err != nil { - logger.Log(1, "failed to relayed nodes: ", node.ID.String(), err.Error()) - proxyPayload.IsRelay = false - } else { - relayPeersMap := make(map[string]models.RelayedConf) - for _, relayedNode := range relayedNodes { - relayedNode := relayedNode - payload, err := GetPeersForProxy(&relayedNode, true) - if err == nil { - relayedHost, err := GetHost(relayedNode.HostID.String()) - if err != nil { - logger.Log(0, "error retrieving host for relayNode", relayedNode.ID.String(), err.Error()) - } - relayedEndpoint, udpErr := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", relayedHost.EndpointIP, host.ListenPort)) - if udpErr == nil { - relayPeersMap[host.PublicKey.String()] = models.RelayedConf{ - RelayedPeerEndpoint: relayedEndpoint, - RelayedPeerPubKey: relayedHost.PublicKey.String(), - Peers: payload.Peers, - } - } - - } - } - proxyPayload.IsRelay = true - proxyPayload.RelayedPeerConf = relayPeersMap - } - } - - } - - for _, peer := range currentPeers { - if peer.ID == node.ID { - //skip yourself - continue - } - host, err := GetHost(peer.HostID.String()) - if err != nil { - continue - } - proxyStatus := host.ProxyEnabled - listenPort := host.ListenPort - if proxyStatus { - listenPort = host.ProxyListenPort - if listenPort == 0 { - listenPort = models.NmProxyPort - } - } else if listenPort == 0 { - listenPort = host.ListenPort - - } - - endpoint, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", host.EndpointIP, listenPort)) - if err != nil { - logger.Log(1, "failed to resolve udp addr for node: ", peer.ID.String(), host.EndpointIP.String(), err.Error()) - continue - } - allowedips := GetAllowedIPs(node, &peer, nil) - var keepalive time.Duration - if node.PersistentKeepalive != 0 { - // set_keepalive - keepalive = node.PersistentKeepalive - } - peers = append(peers, wgtypes.PeerConfig{ - PublicKey: host.PublicKey, - Endpoint: endpoint, - AllowedIPs: allowedips, - PersistentKeepaliveInterval: &keepalive, - ReplaceAllowedIPs: true, - }) - peerConfMap[host.PublicKey.String()] = models.PeerConf{ - Address: net.ParseIP(peer.PrimaryAddress()), - Proxy: proxyStatus, - PublicListenPort: int32(listenPort), - } - - if !onlyPeers && peer.IsRelayed { - relayNode := FindRelay(&peer) - if relayNode != nil { - relayHost, err := GetHost(relayNode.HostID.String()) - if err != nil { - logger.Log(0, "error retrieving host for relayNode", relayNode.ID.String(), err.Error()) - continue - } - relayTo, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", relayHost.EndpointIP, relayHost.ListenPort)) - if err == nil { - peerConfMap[host.PublicKey.String()] = models.PeerConf{ - - IsRelayed: true, - RelayedTo: relayTo, - Address: net.ParseIP(peer.PrimaryAddress()), - Proxy: proxyStatus, - PublicListenPort: int32(listenPort), - } - } - - } - - } - } - if node.IsIngressGateway { - var extPeers []wgtypes.PeerConfig - extPeers, peerConfMap, err = getExtPeersForProxy(node, peerConfMap) - if err == nil { - peers = append(peers, extPeers...) - - } else if !database.IsEmptyRecord(err) { - logger.Log(1, "error retrieving external clients:", err.Error()) - } - } - - proxyPayload.IsIngress = node.IsIngressGateway - addr := node.Address - if addr.String() == "" { - addr = node.Address6 - } - proxyPayload.Peers = peers - proxyPayload.PeerMap = peerConfMap - //proxyPayload.Network = node.Network - //proxyPayload.InterfaceName = node.Interface - //hardcode or read from host ?? - proxyPayload.InterfaceName = models.WIREGUARD_INTERFACE - - return proxyPayload, nil -} - // GetProxyUpdateForHost - gets the proxy update for host func GetProxyUpdateForHost(host *models.Host) (models.ProxyManagerPayload, error) { proxyPayload := models.ProxyManagerPayload{ @@ -331,7 +159,6 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd if peer.ID == node.ID { logger.Log(2, "peer update, skipping self") //skip yourself - continue } var peerConfig wgtypes.PeerConfig @@ -341,7 +168,7 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd return models.HostPeerUpdate{}, err } - if !peer.Connected || peer.Action == models.NODE_DELETE || peer.PendingDelete { + if !peer.Connected { logger.Log(2, "peer update, skipping unconnected node", peer.ID.String()) //skip unconnected nodes continue @@ -421,6 +248,9 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd var nodePeer wgtypes.PeerConfig if _, ok := hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()]; !ok { hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()] = make(map[string]models.IDandAddr) + if peer.Action == models.NODE_DELETE || peer.PendingDelete { + peerConfig.Remove = true + } hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, peerConfig) peerIndexMap[peerHost.PublicKey.String()] = len(hostPeerUpdate.Peers) - 1 hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()][peer.ID.String()] = models.IDandAddr{ @@ -517,6 +347,7 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd return hostPeerUpdate, nil } +// GetPeerListenPort - given a host, retrieve it's appropriate listening port func GetPeerListenPort(host *models.Host) int { peerPort := host.ListenPort if host.ProxyEnabled { diff --git a/logic/zombie.go b/logic/zombie.go index 488bfaa5..199a8b4d 100644 --- a/logic/zombie.go +++ b/logic/zombie.go @@ -10,8 +10,8 @@ import ( ) const ( - // ZOMBIE_TIMEOUT - timeout in seconds for checking zombie status - ZOMBIE_TIMEOUT = 60 + // ZOMBIE_TIMEOUT - timeout in hours for checking zombie status + ZOMBIE_TIMEOUT = 6 // ZOMBIE_DELETE_TIME - timeout in minutes for zombie node deletion ZOMBIE_DELETE_TIME = 10 ) @@ -86,7 +86,7 @@ func ManageZombies(ctx context.Context, peerUpdate chan *models.Node) { zombies = append(zombies, id) case id := <-newHostZombie: hostZombies = append(hostZombies, id) - case <-time.After(time.Second * ZOMBIE_TIMEOUT): + case <-time.After(time.Hour * ZOMBIE_TIMEOUT): // run this check 4 times a day logger.Log(3, "checking for zombie nodes") if len(zombies) > 0 { for i := len(zombies) - 1; i >= 0; i-- { diff --git a/mq/handlers.go b/mq/handlers.go index 49555238..ec7ba4d8 100644 --- a/mq/handlers.go +++ b/mq/handlers.go @@ -227,7 +227,7 @@ func UpdateMetrics(client mqtt.Client, msg mqtt.Message) { logger.Log(2, "updating peers after node", currentNode.ID.String(), currentNode.Network, "detected connectivity issues") host, err := logic.GetHost(currentNode.HostID.String()) if err == nil { - if err = PublishSingleHostUpdate(host); err != nil { + if err = PublishSingleHostPeerUpdate(host); err != nil { logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network) } } diff --git a/mq/publishers.go b/mq/publishers.go index e6ac1c4d..63dc093f 100644 --- a/mq/publishers.go +++ b/mq/publishers.go @@ -25,7 +25,7 @@ func PublishPeerUpdate() error { } for _, host := range hosts { host := host - err = PublishSingleHostUpdate(&host) + err = PublishSingleHostPeerUpdate(&host) if err != nil { logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) } @@ -33,8 +33,8 @@ func PublishPeerUpdate() error { return err } -// PublishSingleHostUpdate --- determines and publishes a peer update to one host -func PublishSingleHostUpdate(host *models.Host) error { +// PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host +func PublishSingleHostPeerUpdate(host *models.Host) error { peerUpdate, err := logic.GetPeerUpdateForHost("", host) if err != nil { @@ -56,13 +56,6 @@ func PublishSingleHostUpdate(host *models.Host) error { return publish(host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), servercfg.GetServer()), data) } -// PublishExtPeerUpdate --- publishes a peer update to all the peers of a node -func PublishExtPeerUpdate(node *models.Node) error { - - go PublishPeerUpdate() - return nil -} - // NodeUpdate -- publishes a node update func NodeUpdate(node *models.Node) error { host, err := logic.GetHost(node.HostID.String()) @@ -410,7 +403,7 @@ func sendPeers() { if force { host := host logger.Log(2, "sending scheduled peer update (5 min)") - err = PublishSingleHostUpdate(&host) + err = PublishSingleHostPeerUpdate(&host) if err != nil { logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error()) } From 6d07de3fdd2814e9b9a0bfb94ef53d36c3b79950 Mon Sep 17 00:00:00 2001 From: 0xdcarns Date: Thu, 23 Feb 2023 10:57:30 -0500 Subject: [PATCH 2/4] fix potential range memory issues --- logic/peers.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/logic/peers.go b/logic/peers.go index 15817c3c..b4f53ffb 100644 --- a/logic/peers.go +++ b/logic/peers.go @@ -156,6 +156,7 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd nodePeerMap = make(map[string]models.PeerRouteInfo) } for _, peer := range currentPeers { + peer := peer if peer.ID == node.ID { logger.Log(2, "peer update, skipping self") //skip yourself @@ -289,6 +290,7 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd extPeers, extPeerIDAndAddrs, err = getExtPeers(&node) if err == nil { for _, extPeerIdAndAddr := range extPeerIDAndAddrs { + extPeerIdAndAddr := extPeerIdAndAddr nodePeerMap[extPeerIdAndAddr.ID] = models.PeerRouteInfo{ PeerAddr: net.IPNet{ IP: net.ParseIP(extPeerIdAndAddr.Address), @@ -300,6 +302,7 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd } hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, extPeers...) for _, extPeerIdAndAddr := range extPeerIDAndAddrs { + extPeerIdAndAddr := extPeerIdAndAddr hostPeerUpdate.HostPeerIDs[extPeerIdAndAddr.ID] = make(map[string]models.IDandAddr) hostPeerUpdate.HostPeerIDs[extPeerIdAndAddr.ID][extPeerIdAndAddr.ID] = models.IDandAddr{ ID: extPeerIdAndAddr.ID, From dc540a266b66fd57d839bdf3d7031869c6bf58b5 Mon Sep 17 00:00:00 2001 From: 0xdcarns Date: Thu, 23 Feb 2023 14:32:57 -0500 Subject: [PATCH 3/4] added deleted node peer calculation --- logic/peers.go | 35 +++++++++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/logic/peers.go b/logic/peers.go index b4f53ffb..15917a8f 100644 --- a/logic/peers.go +++ b/logic/peers.go @@ -122,6 +122,8 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd if host == nil { return models.HostPeerUpdate{}, errors.New("host is nil") } + // track which nodes are deleted + // after peer calculation, if peer not in list, add delete config of peer hostPeerUpdate := models.HostPeerUpdate{ Host: *host, Server: servercfg.GetServer(), @@ -136,6 +138,7 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd Peers: []wgtypes.PeerConfig{}, NodePeers: []wgtypes.PeerConfig{}, } + var deletedNodes = []models.Node{} // used to track deleted nodes logger.Log(1, "peer update for host ", host.ID.String()) peerIndexMap := make(map[string]int) for _, nodeID := range host.Nodes { @@ -143,7 +146,7 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd if err != nil { continue } - if !node.Connected || node.Action == models.NODE_DELETE || node.PendingDelete { + if !node.Connected || node.PendingDelete || node.Action == models.NODE_DELETE { continue } currentPeers, err := GetNetworkNodes(node.Network) @@ -162,6 +165,10 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd //skip yourself continue } + if peer.Action == models.NODE_DELETE || peer.PendingDelete { + deletedNodes = append(deletedNodes, peer) // track deleted node for peer update + continue + } var peerConfig wgtypes.PeerConfig peerHost, err := GetHost(peer.HostID.String()) if err != nil { @@ -221,6 +228,7 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd _, extPeerIDAndAddrs, err := getExtPeers(&peer) if err == nil { for _, extPeerIdAndAddr := range extPeerIDAndAddrs { + extPeerIdAndAddr := extPeerIdAndAddr nodePeerMap[extPeerIdAndAddr.ID] = models.PeerRouteInfo{ PeerAddr: net.IPNet{ IP: net.ParseIP(extPeerIdAndAddr.Address), @@ -249,9 +257,6 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd var nodePeer wgtypes.PeerConfig if _, ok := hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()]; !ok { hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()] = make(map[string]models.IDandAddr) - if peer.Action == models.NODE_DELETE || peer.PendingDelete { - peerConfig.Remove = true - } hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, peerConfig) peerIndexMap[peerHost.PublicKey.String()] = len(hostPeerUpdate.Peers) - 1 hostPeerUpdate.HostPeerIDs[peerHost.PublicKey.String()][peer.ID.String()] = models.IDandAddr{ @@ -347,6 +352,28 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd } } + // run through delete nodes + if len(deletedNodes) > 0 { + for i := range deletedNodes { + delNode := deletedNodes[i] + delHost, err := GetHost(delNode.HostID.String()) + if err != nil { + continue + } + if _, ok := hostPeerUpdate.HostPeerIDs[delHost.PublicKey.String()]; !ok { + var peerConfig = wgtypes.PeerConfig{} + peerConfig.PublicKey = delHost.PublicKey + peerConfig.Endpoint = &net.UDPAddr{ + IP: delHost.EndpointIP, + Port: GetPeerListenPort(delHost), + } + peerConfig.Remove = true + peerConfig.AllowedIPs = []net.IPNet{delNode.Address, delNode.Address6} + hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, peerConfig) + } + } + } + return hostPeerUpdate, nil } From adf3967e0d3965a05c493f16cd34dea50bfe1427 Mon Sep 17 00:00:00 2001 From: 0xdcarns Date: Fri, 24 Feb 2023 10:37:11 -0500 Subject: [PATCH 4/4] handled forced deleted peer --- controllers/node.go | 17 ++++++++++++----- logic/peers.go | 7 +++++-- mq/handlers.go | 3 +-- mq/publishers.go | 31 ++++++++++++++++++++++++++----- 4 files changed, 44 insertions(+), 14 deletions(-) diff --git a/controllers/node.go b/controllers/node.go index 6e91b48e..be9d69e8 100644 --- a/controllers/node.go +++ b/controllers/node.go @@ -433,7 +433,7 @@ func getNode(w http.ResponseWriter, r *http.Request) { logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) return } - hostPeerUpdate, err := logic.GetPeerUpdateForHost(node.Network, host) + hostPeerUpdate, err := logic.GetPeerUpdateForHost(node.Network, host, nil) if err != nil && !database.IsEmptyRecord(err) { logger.Log(0, r.Header.Get("user"), fmt.Sprintf("error fetching wg peers config for host [ %s ]: %v", host.ID.String(), err)) @@ -616,7 +616,7 @@ func createNode(w http.ResponseWriter, r *http.Request) { return } } - hostPeerUpdate, err := logic.GetPeerUpdateForHost(networkName, &data.Host) + hostPeerUpdate, err := logic.GetPeerUpdateForHost(networkName, &data.Host, nil) if err != nil && !database.IsEmptyRecord(err) { logger.Log(0, r.Header.Get("user"), fmt.Sprintf("error fetching wg peers config for host [ %s ]: %v", data.Host.ID.String(), err)) @@ -985,10 +985,17 @@ func deleteNode(w http.ResponseWriter, r *http.Request) { if !fromNode { // notify node change runUpdates(&node, false) } - go func() { // notify of peer change - if err := mq.PublishPeerUpdate(); err != nil { + go func(deletedNode *models.Node, fromNode bool) { // notify of peer change + var err error + if fromNode { + err = mq.PublishDeletedNodePeerUpdate(deletedNode) + } else { + err = mq.PublishPeerUpdate() + } + if err != nil { logger.Log(1, "error publishing peer update ", err.Error()) } + host, err := logic.GetHost(node.HostID.String()) if err != nil { logger.Log(1, "failed to retrieve host for node", node.ID.String(), err.Error()) @@ -996,7 +1003,7 @@ func deleteNode(w http.ResponseWriter, r *http.Request) { if err := mq.PublishDNSDelete(&node, host); err != nil { logger.Log(1, "error publishing dns update", err.Error()) } - }() + }(&node, fromNode) } func runUpdates(node *models.Node, ifaceDelta bool) { diff --git a/logic/peers.go b/logic/peers.go index 15917a8f..dbc9583c 100644 --- a/logic/peers.go +++ b/logic/peers.go @@ -41,7 +41,7 @@ func GetProxyUpdateForHost(host *models.Host) (models.ProxyManagerPayload, error relayPeersMap := make(map[string]models.RelayedConf) for _, relayedHost := range relayedHosts { relayedHost := relayedHost - payload, err := GetPeerUpdateForHost("", &relayedHost) + payload, err := GetPeerUpdateForHost("", &relayedHost, nil) if err == nil { relayedEndpoint, udpErr := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", relayedHost.EndpointIP, GetPeerListenPort(&relayedHost))) if udpErr == nil { @@ -118,7 +118,7 @@ func GetProxyUpdateForHost(host *models.Host) (models.ProxyManagerPayload, error } // GetPeerUpdateForHost - gets the consolidated peer update for the host from all networks -func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpdate, error) { +func GetPeerUpdateForHost(network string, host *models.Host, deletedNode *models.Node) (models.HostPeerUpdate, error) { if host == nil { return models.HostPeerUpdate{}, errors.New("host is nil") } @@ -139,6 +139,9 @@ func GetPeerUpdateForHost(network string, host *models.Host) (models.HostPeerUpd NodePeers: []wgtypes.PeerConfig{}, } var deletedNodes = []models.Node{} // used to track deleted nodes + if deletedNode != nil { + deletedNodes = append(deletedNodes, *deletedNode) + } logger.Log(1, "peer update for host ", host.ID.String()) peerIndexMap := make(map[string]int) for _, nodeID := range host.Nodes { diff --git a/mq/handlers.go b/mq/handlers.go index ec7ba4d8..e4b79611 100644 --- a/mq/handlers.go +++ b/mq/handlers.go @@ -227,11 +227,10 @@ func UpdateMetrics(client mqtt.Client, msg mqtt.Message) { logger.Log(2, "updating peers after node", currentNode.ID.String(), currentNode.Network, "detected connectivity issues") host, err := logic.GetHost(currentNode.HostID.String()) if err == nil { - if err = PublishSingleHostPeerUpdate(host); err != nil { + if err = PublishSingleHostPeerUpdate(host, nil); err != nil { logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network) } } - } logger.Log(1, "updated node metrics", id) diff --git a/mq/publishers.go b/mq/publishers.go index 63dc093f..c641e0de 100644 --- a/mq/publishers.go +++ b/mq/publishers.go @@ -25,7 +25,7 @@ func PublishPeerUpdate() error { } for _, host := range hosts { host := host - err = PublishSingleHostPeerUpdate(&host) + err = PublishSingleHostPeerUpdate(&host, nil) if err != nil { logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) } @@ -33,10 +33,31 @@ func PublishPeerUpdate() error { return err } -// PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host -func PublishSingleHostPeerUpdate(host *models.Host) error { +// PublishDeletedNodePeerUpdate --- determines and publishes a peer update +// to all the hosts with a deleted node to account for +func PublishDeletedNodePeerUpdate(delNode *models.Node) error { + if !servercfg.IsMessageQueueBackend() { + return nil + } - peerUpdate, err := logic.GetPeerUpdateForHost("", host) + hosts, err := logic.GetAllHosts() + if err != nil { + logger.Log(1, "err getting all hosts", err.Error()) + return err + } + for _, host := range hosts { + host := host + if err = PublishSingleHostPeerUpdate(&host, delNode); err != nil { + logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error()) + } + } + return err +} + +// PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host +func PublishSingleHostPeerUpdate(host *models.Host, deletedNode *models.Node) error { + + peerUpdate, err := logic.GetPeerUpdateForHost("", host, deletedNode) if err != nil { return err } @@ -403,7 +424,7 @@ func sendPeers() { if force { host := host logger.Log(2, "sending scheduled peer update (5 min)") - err = PublishSingleHostPeerUpdate(&host) + err = PublishSingleHostPeerUpdate(&host, nil) if err != nil { logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error()) }