diff --git a/mq/publishers.go b/mq/publishers.go index ffa2313e..8556d705 100644 --- a/mq/publishers.go +++ b/mq/publishers.go @@ -56,7 +56,7 @@ func PublishSingleHostUpdate(host *models.Host) error { return publish(host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), servercfg.GetServer()), data) } -// PublishPeerUpdate --- publishes a peer update to all the peers of a node +// PublishExtPeerUpdate --- publishes a peer update to all the peers of a node func PublishExtPeerUpdate(node *models.Node) error { go PublishPeerUpdate() @@ -111,40 +111,6 @@ func HostUpdate(hostUpdate *models.HostUpdate) error { return nil } -// sendPeers - retrieve networks, send peer ports to all peers -func sendPeers() { - - hosts, err := logic.GetAllHosts() - if err != nil { - logger.Log(1, "error retrieving networks for keepalive", err.Error()) - } - - var force bool - peer_force_send++ - if peer_force_send == 5 { - servercfg.SetHost() - force = true - peer_force_send = 0 - err := logic.TimerCheckpoint() // run telemetry & log dumps if 24 hours has passed.. - if err != nil { - logger.Log(3, "error occurred on timer,", err.Error()) - } - - //collectServerMetrics(networks[:]) - } - - for _, host := range hosts { - if force { - host := host - logger.Log(2, "sending scheduled peer update (5 min)") - err = PublishSingleHostUpdate(&host) - if err != nil { - logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error()) - } - } - } -} - // ServerStartNotify - notifies all non server nodes to pull changes after a restart func ServerStartNotify() error { nodes, err := logic.GetAllNodes() @@ -188,58 +154,13 @@ func PublishDNSUpdate(network string, dns models.DNSUpdate) error { // PublishAllDNS publishes an array of dns updates (ip / host.network) for each peer to a node joining a network func PublishAllDNS(newnode *models.Node) error { alldns := []models.DNSUpdate{} - dns := models.DNSUpdate{} newnodeHost, err := logic.GetHost(newnode.HostID.String()) if err != nil { return fmt.Errorf("error retrieving host for dns update %w", err) } - nodes, err := logic.GetNetworkNodes(newnode.Network) - if err != nil { - return err - } - for _, node := range nodes { - host, err := logic.GetHost(node.HostID.String()) - if err != nil { - logger.Log(0, "error retrieving host for dns update", host.ID.String(), err.Error()) - continue - } - dns.Action = models.DNSInsert - dns.Name = host.Name + "." + node.Network - if node.Address.IP != nil { - dns.Address = node.Address.IP.String() - alldns = append(alldns, dns) - } - if node.Address6.IP != nil { - dns.Address = node.Address6.IP.String() - alldns = append(alldns, dns) - } - } - clients, err := logic.GetNetworkExtClients(newnode.Network) - if err != nil { - logger.Log(0, "error retrieving extclients", err.Error()) - } - for _, client := range clients { - dns.Action = models.DNSInsert - dns.Name = client.ClientID + "." + client.Network - if client.Address != "" { - dns.Address = client.Address - alldns = append(alldns, dns) - } - if client.Address6 != "" { - dns.Address = client.Address - alldns = append(alldns, dns) - } - } - customdns, err := logic.GetCustomDNS(newnode.Network) - if err != nil { - logger.Log(0, "error retrieving custom dns entries", err.Error()) - } - for _, custom := range customdns { - dns.Action = models.DNSInsert - dns.Address = custom.Address - dns.Name = custom.Name + "." + custom.Network - alldns = append(alldns, dns) - } + alldns = append(alldns, getNodeDNS(newnode.Network)...) + alldns = append(alldns, getExtClientDNS(newnode.Network)...) + alldns = append(alldns, getCustomDNS(newnode.Network)...) data, err := json.Marshal(alldns) if err != nil { return fmt.Errorf("error encoding dns data %w", err) @@ -272,7 +193,7 @@ func PublishDNSDelete(node *models.Node, host *models.Host) error { return nil } -// PublishReplaceNDS publish a dns update to replace a dns entry on all hosts in network +// PublishReplaceDNS publish a dns update to replace a dns entry on all hosts in network func PublishReplaceDNS(oldNode, newNode *models.Node, host *models.Host) error { dns := models.DNSUpdate{ Action: models.DNSReplaceIP, @@ -322,7 +243,7 @@ func PublishExtCLientDNS(client *models.ExtClient) error { return nil } -// PublishExtClientUpdate publishes dns update for extclient name change +// PublishExtClientDNSUpdate update for extclient name change func PublishExtClientDNSUpdate(old, new models.ExtClient, network string) error { dns := models.DNSUpdate{ Action: models.DNSReplaceName, @@ -335,7 +256,7 @@ func PublishExtClientDNSUpdate(old, new models.ExtClient, network string) error return nil } -// PublishDeleteExtClient publish dns update to delete extclient entry +// PublishDeleteExtClientDNS publish dns update to delete extclient entry func PublishDeleteExtClientDNS(client *models.ExtClient) error { dns := models.DNSUpdate{ Action: models.DNSDeleteByName, @@ -380,49 +301,6 @@ func PublishHostDNSUpdate(old, new *models.Host, networks []string) error { return nil } -// function to collect and store metrics for server nodes -//func collectServerMetrics(networks []models.Network) { -// if !servercfg.Is_EE { -// return -// } -// if len(networks) > 0 { -// for i := range networks { -// currentNetworkNodes, err := logic.GetNetworkNodes(networks[i].NetID) -// if err != nil { -// continue -// } -// currentServerNodes := logic.GetServerNodes(networks[i].NetID) -// if len(currentServerNodes) > 0 { -// for i := range currentServerNodes { -// if logic.IsLocalServer(¤tServerNodes[i]) { -// serverMetrics := logic.CollectServerMetrics(currentServerNodes[i].ID, currentNetworkNodes) -// if serverMetrics != nil { -// serverMetrics.NodeName = currentServerNodes[i].Name -// serverMetrics.NodeID = currentServerNodes[i].ID -// serverMetrics.IsServer = "yes" -// serverMetrics.Network = currentServerNodes[i].Network -// if err = metrics.GetExchangedBytesForNode(¤tServerNodes[i], serverMetrics); err != nil { -// logger.Log(1, fmt.Sprintf("failed to update exchanged bytes info for server: %s, err: %v", -// currentServerNodes[i].Name, err)) -// } -// updateNodeMetrics(¤tServerNodes[i], serverMetrics) -// if err = logic.UpdateMetrics(currentServerNodes[i].ID, serverMetrics); err != nil { -// logger.Log(1, "failed to update metrics for server node", currentServerNodes[i].ID) -// } -// if servercfg.IsMetricsExporter() { -// logger.Log(2, "-------------> SERVER METRICS: ", fmt.Sprintf("%+v", serverMetrics)) -// if err := pushMetricsToExporter(*serverMetrics); err != nil { -// logger.Log(2, "failed to push server metrics to exporter: ", err.Error()) -// } -// } -// } -// } -// } -// } -// } -// } -//} - func pushMetricsToExporter(metrics models.Metrics) error { logger.Log(2, "----> Pushing metrics to exporter") data, err := json.Marshal(metrics) @@ -440,3 +318,102 @@ func pushMetricsToExporter(metrics models.Metrics) error { } return nil } + +func getNodeDNS(network string) []models.DNSUpdate { + alldns := []models.DNSUpdate{} + dns := models.DNSUpdate{} + nodes, err := logic.GetNetworkNodes(network) + if err != nil { + logger.Log(0, "error retreiving network nodes for network", network, err.Error()) + } + for _, node := range nodes { + host, err := logic.GetHost(node.HostID.String()) + if err != nil { + logger.Log(0, "error retrieving host for dns update", host.ID.String(), err.Error()) + continue + } + dns.Action = models.DNSInsert + dns.Name = host.Name + "." + node.Network + if node.Address.IP != nil { + dns.Address = node.Address.IP.String() + alldns = append(alldns, dns) + } + if node.Address6.IP != nil { + dns.Address = node.Address6.IP.String() + alldns = append(alldns, dns) + } + } + return alldns +} + +func getExtClientDNS(network string) []models.DNSUpdate { + alldns := []models.DNSUpdate{} + dns := models.DNSUpdate{} + clients, err := logic.GetNetworkExtClients(network) + if err != nil { + logger.Log(0, "error retrieving extclients", err.Error()) + } + for _, client := range clients { + dns.Action = models.DNSInsert + dns.Name = client.ClientID + "." + client.Network + if client.Address != "" { + dns.Address = client.Address + alldns = append(alldns, dns) + } + if client.Address6 != "" { + dns.Address = client.Address + alldns = append(alldns, dns) + } + } + return alldns +} + +func getCustomDNS(network string) []models.DNSUpdate { + alldns := []models.DNSUpdate{} + dns := models.DNSUpdate{} + customdns, err := logic.GetCustomDNS(network) + if err != nil { + logger.Log(0, "error retrieving custom dns entries", err.Error()) + } + for _, custom := range customdns { + dns.Action = models.DNSInsert + dns.Address = custom.Address + dns.Name = custom.Name + "." + custom.Network + alldns = append(alldns, dns) + } + return alldns +} + +// sendPeers - retrieve networks, send peer ports to all peers +func sendPeers() { + + hosts, err := logic.GetAllHosts() + if err != nil { + logger.Log(1, "error retrieving networks for keepalive", err.Error()) + } + + var force bool + peer_force_send++ + if peer_force_send == 5 { + servercfg.SetHost() + force = true + peer_force_send = 0 + err := logic.TimerCheckpoint() // run telemetry & log dumps if 24 hours has passed.. + if err != nil { + logger.Log(3, "error occurred on timer,", err.Error()) + } + + //collectServerMetrics(networks[:]) + } + + for _, host := range hosts { + if force { + host := host + logger.Log(2, "sending scheduled peer update (5 min)") + err = PublishSingleHostUpdate(&host) + if err != nil { + logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error()) + } + } + } +}