diff --git a/logic/peers.go b/logic/peers.go index b2b45b20..68106ab3 100644 --- a/logic/peers.go +++ b/logic/peers.go @@ -363,7 +363,7 @@ func GetPeerUpdateForHost(network string, host *models.Host, deletedNode *models if err != nil { continue } - if _, ok := hostPeerUpdate.HostPeerIDs[delHost.PublicKey.String()]; !ok { + if _, ok := peerIndexMap[delHost.PublicKey.String()]; !ok { var peerConfig = wgtypes.PeerConfig{} peerConfig.PublicKey = delHost.PublicKey peerConfig.Endpoint = &net.UDPAddr{ diff --git a/mq/handlers.go b/mq/handlers.go index 379fbfc2..92863147 100644 --- a/mq/handlers.go +++ b/mq/handlers.go @@ -22,60 +22,177 @@ func DefaultHandler(client mqtt.Client, msg mqtt.Message) { // Ping message Handler -- handles ping topic from client nodes func Ping(client mqtt.Client, msg mqtt.Message) { - go func() { - id, err := getID(msg.Topic()) + id, err := getID(msg.Topic()) + if err != nil { + logger.Log(0, "error getting node.ID sent on ping topic ") + return + } + node, err := logic.GetNodeByID(id) + if err != nil { + logger.Log(3, "mq-ping error getting node: ", err.Error()) + _, err := database.FetchRecord(database.NODES_TABLE_NAME, id) if err != nil { - logger.Log(0, "error getting node.ID sent on ping topic ") - return - } - node, err := logic.GetNodeByID(id) - if err != nil { - logger.Log(3, "mq-ping error getting node: ", err.Error()) - record, err := database.FetchRecord(database.NODES_TABLE_NAME, id) - if err != nil { - logger.Log(3, "error reading database ", err.Error()) - return - } - logger.Log(3, "record from database") - logger.Log(3, record) - return - } - decrypted, decryptErr := decryptMsg(&node, msg.Payload()) - if decryptErr != nil { - logger.Log(0, "error decrypting when updating node ", node.ID.String(), decryptErr.Error()) - return - } - var checkin models.NodeCheckin - if err := json.Unmarshal(decrypted, &checkin); err != nil { - logger.Log(1, "error unmarshaling payload ", err.Error()) - return - } - host, err := logic.GetHost(node.HostID.String()) - if err != nil { - logger.Log(0, "error retrieving host for node ", node.ID.String(), err.Error()) - return - } - node.SetLastCheckIn() - host.Version = checkin.Version - node.Connected = checkin.Connected - host.Interfaces = checkin.Ifaces - for i := range host.Interfaces { - host.Interfaces[i].AddressString = host.Interfaces[i].Address.String() - } - if err := logic.UpdateNode(&node, &node); err != nil { - logger.Log(0, "error updating node", node.ID.String(), " on checkin", err.Error()) + logger.Log(3, "error reading database", err.Error()) return } - logger.Log(3, "ping processed for node", node.ID.String()) - // --TODO --set client version once feature is implemented. - //node.SetClientVersion(msg.Payload()) - }() + return + } + decrypted, decryptErr := decryptMsg(&node, msg.Payload()) + if decryptErr != nil { + logger.Log(0, "error decrypting when updating node ", node.ID.String(), decryptErr.Error()) + return + } + var checkin models.NodeCheckin + if err := json.Unmarshal(decrypted, &checkin); err != nil { + logger.Log(1, "error unmarshaling payload ", err.Error()) + return + } + host, err := logic.GetHost(node.HostID.String()) + if err != nil { + logger.Log(0, "error retrieving host for node ", node.ID.String(), err.Error()) + return + } + node.SetLastCheckIn() + host.Version = checkin.Version + node.Connected = checkin.Connected + host.Interfaces = checkin.Ifaces + for i := range host.Interfaces { + host.Interfaces[i].AddressString = host.Interfaces[i].Address.String() + } + if err := logic.UpdateNode(&node, &node); err != nil { + logger.Log(0, "error updating node", node.ID.String(), " on checkin", err.Error()) + return + } + + logger.Log(3, "ping processed for node", node.ID.String()) + // --TODO --set client version once feature is implemented. + //node.SetClientVersion(msg.Payload()) } // UpdateNode message Handler -- handles updates from client nodes func UpdateNode(client mqtt.Client, msg mqtt.Message) { - go func() { + id, err := getID(msg.Topic()) + if err != nil { + logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error()) + return + } + currentNode, err := logic.GetNodeByID(id) + if err != nil { + logger.Log(1, "error getting node ", id, err.Error()) + return + } + decrypted, decryptErr := decryptMsg(¤tNode, msg.Payload()) + if decryptErr != nil { + logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error()) + return + } + var newNode models.Node + if err := json.Unmarshal(decrypted, &newNode); err != nil { + logger.Log(1, "error unmarshaling payload ", err.Error()) + return + } + + ifaceDelta := logic.IfaceDelta(¤tNode, &newNode) + if servercfg.Is_EE && ifaceDelta { + if err = logic.EnterpriseResetAllPeersFailovers(currentNode.ID, currentNode.Network); err != nil { + logger.Log(1, "failed to reset failover list during node update", currentNode.ID.String(), currentNode.Network) + } + } + newNode.SetLastCheckIn() + if err := logic.UpdateNode(¤tNode, &newNode); err != nil { + logger.Log(1, "error saving node", err.Error()) + return + } + if ifaceDelta { // reduce number of unneeded updates, by only sending on iface changes + if err = PublishPeerUpdate(); err != nil { + logger.Log(0, "error updating peers when node", currentNode.ID.String(), "informed the server of an interface change", err.Error()) + } + } + + logger.Log(1, "updated node", id, newNode.ID.String()) +} + +// UpdateHost message Handler -- handles host updates from clients +func UpdateHost(client mqtt.Client, msg mqtt.Message) { + id, err := getID(msg.Topic()) + if err != nil { + logger.Log(1, "error getting host.ID sent on ", msg.Topic(), err.Error()) + return + } + currentHost, err := logic.GetHost(id) + if err != nil { + logger.Log(1, "error getting host ", id, err.Error()) + return + } + decrypted, decryptErr := decryptMsgWithHost(currentHost, msg.Payload()) + if decryptErr != nil { + logger.Log(1, "failed to decrypt message for host ", id, decryptErr.Error()) + return + } + var hostUpdate models.HostUpdate + if err := json.Unmarshal(decrypted, &hostUpdate); err != nil { + logger.Log(1, "error unmarshaling payload ", err.Error()) + return + } + logger.Log(3, fmt.Sprintf("recieved host update: %s\n", hostUpdate.Host.ID.String())) + var sendPeerUpdate bool + switch hostUpdate.Action { + case models.Acknowledgement: + hu := hostactions.GetAction(currentHost.ID.String()) + if hu != nil { + if err = HostUpdate(hu); err != nil { + logger.Log(0, "failed to send new node to host", hostUpdate.Host.Name, currentHost.ID.String(), err.Error()) + return + } else { + if err = PublishSingleHostPeerUpdate(currentHost, nil); err != nil { + logger.Log(0, "failed peers publish after join acknowledged", hostUpdate.Host.Name, currentHost.ID.String(), err.Error()) + return + } + } + } + case models.UpdateHost: + sendPeerUpdate = logic.UpdateHostFromClient(&hostUpdate.Host, currentHost) + err := logic.UpsertHost(currentHost) + if err != nil { + logger.Log(0, "failed to update host: ", currentHost.ID.String(), err.Error()) + return + } + case models.DeleteHost: + if servercfg.GetBrokerType() == servercfg.EmqxBrokerType { + // delete EMQX credentials for host + if err := DeleteEmqxUser(currentHost.ID.String()); err != nil { + logger.Log(0, "failed to remove host credentials from EMQX: ", currentHost.ID.String(), err.Error()) + return + } + } + if err := logic.DisassociateAllNodesFromHost(currentHost.ID.String()); err != nil { + logger.Log(0, "failed to delete all nodes of host: ", currentHost.ID.String(), err.Error()) + return + } + if err := logic.RemoveHostByID(currentHost.ID.String()); err != nil { + logger.Log(0, "failed to delete host: ", currentHost.ID.String(), err.Error()) + return + } + sendPeerUpdate = true + } + + if sendPeerUpdate { + err := PublishPeerUpdate() + if err != nil { + logger.Log(0, "failed to pulish peer update: ", err.Error()) + } + } + // if servercfg.Is_EE && ifaceDelta { + // if err = logic.EnterpriseResetAllPeersFailovers(currentHost.ID.String(), currentHost.Network); err != nil { + // logger.Log(1, "failed to reset failover list during node update", currentHost.ID.String(), currentHost.Network) + // } + // } +} + +// UpdateMetrics message Handler -- handles updates from client nodes for metrics +func UpdateMetrics(client mqtt.Client, msg mqtt.Message) { + if servercfg.Is_EE { id, err := getID(msg.Topic()) if err != nil { logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error()) @@ -91,209 +208,75 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) { logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error()) return } - var newNode models.Node - if err := json.Unmarshal(decrypted, &newNode); err != nil { + + var newMetrics models.Metrics + if err := json.Unmarshal(decrypted, &newMetrics); err != nil { logger.Log(1, "error unmarshaling payload ", err.Error()) return } - ifaceDelta := logic.IfaceDelta(¤tNode, &newNode) - if servercfg.Is_EE && ifaceDelta { - if err = logic.EnterpriseResetAllPeersFailovers(currentNode.ID, currentNode.Network); err != nil { - logger.Log(1, "failed to reset failover list during node update", currentNode.ID.String(), currentNode.Network) - } - } - newNode.SetLastCheckIn() - if err := logic.UpdateNode(¤tNode, &newNode); err != nil { - logger.Log(1, "error saving node", err.Error()) + shouldUpdate := updateNodeMetrics(¤tNode, &newMetrics) + + if err = logic.UpdateMetrics(id, &newMetrics); err != nil { + logger.Log(1, "faield to update node metrics", id, err.Error()) return } - if ifaceDelta { // reduce number of unneeded updates, by only sending on iface changes - if err = PublishPeerUpdate(); err != nil { - logger.Log(0, "error updating peers when node", currentNode.ID.String(), "informed the server of an interface change", err.Error()) + if servercfg.IsMetricsExporter() { + if err := pushMetricsToExporter(newMetrics); err != nil { + logger.Log(2, fmt.Sprintf("failed to push node: [%s] metrics to exporter, err: %v", + currentNode.ID, err)) } } - logger.Log(1, "updated node", id, newNode.ID.String()) - - }() -} - -// UpdateHost message Handler -- handles host updates from clients -func UpdateHost(client mqtt.Client, msg mqtt.Message) { - go func(msg mqtt.Message) { - id, err := getID(msg.Topic()) - if err != nil { - logger.Log(1, "error getting host.ID sent on ", msg.Topic(), err.Error()) - return - } - currentHost, err := logic.GetHost(id) - if err != nil { - logger.Log(1, "error getting host ", id, err.Error()) - return - } - decrypted, decryptErr := decryptMsgWithHost(currentHost, msg.Payload()) - if decryptErr != nil { - logger.Log(1, "failed to decrypt message for host ", id, decryptErr.Error()) - return - } - var hostUpdate models.HostUpdate - if err := json.Unmarshal(decrypted, &hostUpdate); err != nil { - logger.Log(1, "error unmarshaling payload ", err.Error()) - return - } - logger.Log(3, fmt.Sprintf("recieved host update: %s\n", hostUpdate.Host.ID.String())) - var sendPeerUpdate bool - switch hostUpdate.Action { - case models.Acknowledgement: - hu := hostactions.GetAction(currentHost.ID.String()) - if hu != nil { - if err = HostUpdate(hu); err != nil { - logger.Log(0, "failed to send new node to host", hostUpdate.Host.Name, currentHost.ID.String(), err.Error()) - return - } else { - if err = PublishSingleHostPeerUpdate(currentHost, nil); err != nil { - logger.Log(0, "failed peers publish after join acknowledged", hostUpdate.Host.Name, currentHost.ID.String(), err.Error()) - return - } - } - } - case models.UpdateHost: - sendPeerUpdate = logic.UpdateHostFromClient(&hostUpdate.Host, currentHost) - err := logic.UpsertHost(currentHost) + if newMetrics.Connectivity != nil { + err := logic.EnterpriseFailoverFunc(¤tNode) if err != nil { - logger.Log(0, "failed to update host: ", currentHost.ID.String(), err.Error()) - return - } - case models.DeleteHost: - if servercfg.GetBrokerType() == servercfg.EmqxBrokerType { - // delete EMQX credentials for host - if err := DeleteEmqxUser(currentHost.ID.String()); err != nil { - logger.Log(0, "failed to remove host credentials from EMQX: ", currentHost.ID.String(), err.Error()) - return - } - } - if err := logic.DisassociateAllNodesFromHost(currentHost.ID.String()); err != nil { - logger.Log(0, "failed to delete all nodes of host: ", currentHost.ID.String(), err.Error()) - return - } - if err := logic.RemoveHostByID(currentHost.ID.String()); err != nil { - logger.Log(0, "failed to delete host: ", currentHost.ID.String(), err.Error()) - return - } - sendPeerUpdate = true - } - - if sendPeerUpdate { - err := PublishPeerUpdate() - if err != nil { - logger.Log(0, "failed to pulish peer update: ", err.Error()) + logger.Log(0, "failed to failover for node", currentNode.ID.String(), "on network", currentNode.Network, "-", err.Error()) } } - // if servercfg.Is_EE && ifaceDelta { - // if err = logic.EnterpriseResetAllPeersFailovers(currentHost.ID.String(), currentHost.Network); err != nil { - // logger.Log(1, "failed to reset failover list during node update", currentHost.ID.String(), currentHost.Network) - // } - // } - }(msg) -} - -// UpdateMetrics message Handler -- handles updates from client nodes for metrics -func UpdateMetrics(client mqtt.Client, msg mqtt.Message) { - if servercfg.Is_EE { - go func() { - id, err := getID(msg.Topic()) - if err != nil { - logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error()) - return - } - currentNode, err := logic.GetNodeByID(id) - if err != nil { - logger.Log(1, "error getting node ", id, err.Error()) - return - } - decrypted, decryptErr := decryptMsg(¤tNode, msg.Payload()) - if decryptErr != nil { - logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error()) - return - } - - var newMetrics models.Metrics - if err := json.Unmarshal(decrypted, &newMetrics); err != nil { - logger.Log(1, "error unmarshaling payload ", err.Error()) - return - } - - shouldUpdate := updateNodeMetrics(¤tNode, &newMetrics) - - if err = logic.UpdateMetrics(id, &newMetrics); err != nil { - logger.Log(1, "faield to update node metrics", id, err.Error()) - return - } - if servercfg.IsMetricsExporter() { - if err := pushMetricsToExporter(newMetrics); err != nil { - logger.Log(2, fmt.Sprintf("failed to push node: [%s] metrics to exporter, err: %v", - currentNode.ID, err)) + if shouldUpdate { + logger.Log(2, "updating peers after node", currentNode.ID.String(), currentNode.Network, "detected connectivity issues") + host, err := logic.GetHost(currentNode.HostID.String()) + if err == nil { + if err = PublishSingleHostPeerUpdate(host, nil); err != nil { + logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network) } } + } - if newMetrics.Connectivity != nil { - err := logic.EnterpriseFailoverFunc(¤tNode) - if err != nil { - logger.Log(0, "failed to failover for node", currentNode.ID.String(), "on network", currentNode.Network, "-", err.Error()) - } - } - - if shouldUpdate { - logger.Log(2, "updating peers after node", currentNode.ID.String(), currentNode.Network, "detected connectivity issues") - host, err := logic.GetHost(currentNode.HostID.String()) - if err == nil { - if err = PublishSingleHostPeerUpdate(host, nil); err != nil { - logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network) - } - } - } - - logger.Log(1, "updated node metrics", id) - }() + logger.Log(1, "updated node metrics", id) } } // ClientPeerUpdate message handler -- handles updating peers after signal from client nodes func ClientPeerUpdate(client mqtt.Client, msg mqtt.Message) { - go func() { - id, err := getID(msg.Topic()) - if err != nil { - logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error()) - return - } - currentNode, err := logic.GetNodeByID(id) - if err != nil { - logger.Log(1, "error getting node ", id, err.Error()) - return - } - decrypted, decryptErr := decryptMsg(¤tNode, msg.Payload()) - if decryptErr != nil { - logger.Log(1, "failed to decrypt message during client peer update for node ", id, decryptErr.Error()) - return - } - switch decrypted[0] { - case ncutils.ACK: - //do we still need this - case ncutils.DONE: - updateNodePeers(¤tNode) - } - - logger.Log(1, "sent peer updates after signal received from", id) - }() -} - -func updateNodePeers(currentNode *models.Node) { - if err := PublishPeerUpdate(); err != nil { - logger.Log(1, "error publishing peer update ", err.Error()) + id, err := getID(msg.Topic()) + if err != nil { + logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error()) return } + currentNode, err := logic.GetNodeByID(id) + if err != nil { + logger.Log(1, "error getting node ", id, err.Error()) + return + } + decrypted, decryptErr := decryptMsg(¤tNode, msg.Payload()) + if decryptErr != nil { + logger.Log(1, "failed to decrypt message during client peer update for node ", id, decryptErr.Error()) + return + } + switch decrypted[0] { + case ncutils.ACK: + // do we still need this + case ncutils.DONE: + if err = PublishPeerUpdate(); err != nil { + logger.Log(1, "error publishing peer update for node", currentNode.ID.String(), err.Error()) + return + } + } + + logger.Log(1, "sent peer updates after signal received from", id) } func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) bool {