mirror of
https://github.com/gravitl/netmaker.git
synced 2024-09-20 15:26:04 +08:00
cleanup unnecessary routines + use peerindexmap for delete check
This commit is contained in:
parent
d19f292e6c
commit
385232ca5a
|
@ -363,7 +363,7 @@ func GetPeerUpdateForHost(network string, host *models.Host, deletedNode *models
|
|||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if _, ok := hostPeerUpdate.HostPeerIDs[delHost.PublicKey.String()]; !ok {
|
||||
if _, ok := peerIndexMap[delHost.PublicKey.String()]; !ok {
|
||||
var peerConfig = wgtypes.PeerConfig{}
|
||||
peerConfig.PublicKey = delHost.PublicKey
|
||||
peerConfig.Endpoint = &net.UDPAddr{
|
||||
|
|
435
mq/handlers.go
435
mq/handlers.go
|
@ -22,60 +22,177 @@ func DefaultHandler(client mqtt.Client, msg mqtt.Message) {
|
|||
|
||||
// Ping message Handler -- handles ping topic from client nodes
|
||||
func Ping(client mqtt.Client, msg mqtt.Message) {
|
||||
go func() {
|
||||
id, err := getID(msg.Topic())
|
||||
id, err := getID(msg.Topic())
|
||||
if err != nil {
|
||||
logger.Log(0, "error getting node.ID sent on ping topic ")
|
||||
return
|
||||
}
|
||||
node, err := logic.GetNodeByID(id)
|
||||
if err != nil {
|
||||
logger.Log(3, "mq-ping error getting node: ", err.Error())
|
||||
_, err := database.FetchRecord(database.NODES_TABLE_NAME, id)
|
||||
if err != nil {
|
||||
logger.Log(0, "error getting node.ID sent on ping topic ")
|
||||
return
|
||||
}
|
||||
node, err := logic.GetNodeByID(id)
|
||||
if err != nil {
|
||||
logger.Log(3, "mq-ping error getting node: ", err.Error())
|
||||
record, err := database.FetchRecord(database.NODES_TABLE_NAME, id)
|
||||
if err != nil {
|
||||
logger.Log(3, "error reading database ", err.Error())
|
||||
return
|
||||
}
|
||||
logger.Log(3, "record from database")
|
||||
logger.Log(3, record)
|
||||
return
|
||||
}
|
||||
decrypted, decryptErr := decryptMsg(&node, msg.Payload())
|
||||
if decryptErr != nil {
|
||||
logger.Log(0, "error decrypting when updating node ", node.ID.String(), decryptErr.Error())
|
||||
return
|
||||
}
|
||||
var checkin models.NodeCheckin
|
||||
if err := json.Unmarshal(decrypted, &checkin); err != nil {
|
||||
logger.Log(1, "error unmarshaling payload ", err.Error())
|
||||
return
|
||||
}
|
||||
host, err := logic.GetHost(node.HostID.String())
|
||||
if err != nil {
|
||||
logger.Log(0, "error retrieving host for node ", node.ID.String(), err.Error())
|
||||
return
|
||||
}
|
||||
node.SetLastCheckIn()
|
||||
host.Version = checkin.Version
|
||||
node.Connected = checkin.Connected
|
||||
host.Interfaces = checkin.Ifaces
|
||||
for i := range host.Interfaces {
|
||||
host.Interfaces[i].AddressString = host.Interfaces[i].Address.String()
|
||||
}
|
||||
if err := logic.UpdateNode(&node, &node); err != nil {
|
||||
logger.Log(0, "error updating node", node.ID.String(), " on checkin", err.Error())
|
||||
logger.Log(3, "error reading database", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
logger.Log(3, "ping processed for node", node.ID.String())
|
||||
// --TODO --set client version once feature is implemented.
|
||||
//node.SetClientVersion(msg.Payload())
|
||||
}()
|
||||
return
|
||||
}
|
||||
decrypted, decryptErr := decryptMsg(&node, msg.Payload())
|
||||
if decryptErr != nil {
|
||||
logger.Log(0, "error decrypting when updating node ", node.ID.String(), decryptErr.Error())
|
||||
return
|
||||
}
|
||||
var checkin models.NodeCheckin
|
||||
if err := json.Unmarshal(decrypted, &checkin); err != nil {
|
||||
logger.Log(1, "error unmarshaling payload ", err.Error())
|
||||
return
|
||||
}
|
||||
host, err := logic.GetHost(node.HostID.String())
|
||||
if err != nil {
|
||||
logger.Log(0, "error retrieving host for node ", node.ID.String(), err.Error())
|
||||
return
|
||||
}
|
||||
node.SetLastCheckIn()
|
||||
host.Version = checkin.Version
|
||||
node.Connected = checkin.Connected
|
||||
host.Interfaces = checkin.Ifaces
|
||||
for i := range host.Interfaces {
|
||||
host.Interfaces[i].AddressString = host.Interfaces[i].Address.String()
|
||||
}
|
||||
if err := logic.UpdateNode(&node, &node); err != nil {
|
||||
logger.Log(0, "error updating node", node.ID.String(), " on checkin", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
logger.Log(3, "ping processed for node", node.ID.String())
|
||||
// --TODO --set client version once feature is implemented.
|
||||
//node.SetClientVersion(msg.Payload())
|
||||
}
|
||||
|
||||
// UpdateNode message Handler -- handles updates from client nodes
|
||||
func UpdateNode(client mqtt.Client, msg mqtt.Message) {
|
||||
go func() {
|
||||
id, err := getID(msg.Topic())
|
||||
if err != nil {
|
||||
logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error())
|
||||
return
|
||||
}
|
||||
currentNode, err := logic.GetNodeByID(id)
|
||||
if err != nil {
|
||||
logger.Log(1, "error getting node ", id, err.Error())
|
||||
return
|
||||
}
|
||||
decrypted, decryptErr := decryptMsg(¤tNode, msg.Payload())
|
||||
if decryptErr != nil {
|
||||
logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error())
|
||||
return
|
||||
}
|
||||
var newNode models.Node
|
||||
if err := json.Unmarshal(decrypted, &newNode); err != nil {
|
||||
logger.Log(1, "error unmarshaling payload ", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
ifaceDelta := logic.IfaceDelta(¤tNode, &newNode)
|
||||
if servercfg.Is_EE && ifaceDelta {
|
||||
if err = logic.EnterpriseResetAllPeersFailovers(currentNode.ID, currentNode.Network); err != nil {
|
||||
logger.Log(1, "failed to reset failover list during node update", currentNode.ID.String(), currentNode.Network)
|
||||
}
|
||||
}
|
||||
newNode.SetLastCheckIn()
|
||||
if err := logic.UpdateNode(¤tNode, &newNode); err != nil {
|
||||
logger.Log(1, "error saving node", err.Error())
|
||||
return
|
||||
}
|
||||
if ifaceDelta { // reduce number of unneeded updates, by only sending on iface changes
|
||||
if err = PublishPeerUpdate(); err != nil {
|
||||
logger.Log(0, "error updating peers when node", currentNode.ID.String(), "informed the server of an interface change", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
logger.Log(1, "updated node", id, newNode.ID.String())
|
||||
}
|
||||
|
||||
// UpdateHost message Handler -- handles host updates from clients
|
||||
func UpdateHost(client mqtt.Client, msg mqtt.Message) {
|
||||
id, err := getID(msg.Topic())
|
||||
if err != nil {
|
||||
logger.Log(1, "error getting host.ID sent on ", msg.Topic(), err.Error())
|
||||
return
|
||||
}
|
||||
currentHost, err := logic.GetHost(id)
|
||||
if err != nil {
|
||||
logger.Log(1, "error getting host ", id, err.Error())
|
||||
return
|
||||
}
|
||||
decrypted, decryptErr := decryptMsgWithHost(currentHost, msg.Payload())
|
||||
if decryptErr != nil {
|
||||
logger.Log(1, "failed to decrypt message for host ", id, decryptErr.Error())
|
||||
return
|
||||
}
|
||||
var hostUpdate models.HostUpdate
|
||||
if err := json.Unmarshal(decrypted, &hostUpdate); err != nil {
|
||||
logger.Log(1, "error unmarshaling payload ", err.Error())
|
||||
return
|
||||
}
|
||||
logger.Log(3, fmt.Sprintf("recieved host update: %s\n", hostUpdate.Host.ID.String()))
|
||||
var sendPeerUpdate bool
|
||||
switch hostUpdate.Action {
|
||||
case models.Acknowledgement:
|
||||
hu := hostactions.GetAction(currentHost.ID.String())
|
||||
if hu != nil {
|
||||
if err = HostUpdate(hu); err != nil {
|
||||
logger.Log(0, "failed to send new node to host", hostUpdate.Host.Name, currentHost.ID.String(), err.Error())
|
||||
return
|
||||
} else {
|
||||
if err = PublishSingleHostPeerUpdate(currentHost, nil); err != nil {
|
||||
logger.Log(0, "failed peers publish after join acknowledged", hostUpdate.Host.Name, currentHost.ID.String(), err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
case models.UpdateHost:
|
||||
sendPeerUpdate = logic.UpdateHostFromClient(&hostUpdate.Host, currentHost)
|
||||
err := logic.UpsertHost(currentHost)
|
||||
if err != nil {
|
||||
logger.Log(0, "failed to update host: ", currentHost.ID.String(), err.Error())
|
||||
return
|
||||
}
|
||||
case models.DeleteHost:
|
||||
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
|
||||
// delete EMQX credentials for host
|
||||
if err := DeleteEmqxUser(currentHost.ID.String()); err != nil {
|
||||
logger.Log(0, "failed to remove host credentials from EMQX: ", currentHost.ID.String(), err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
if err := logic.DisassociateAllNodesFromHost(currentHost.ID.String()); err != nil {
|
||||
logger.Log(0, "failed to delete all nodes of host: ", currentHost.ID.String(), err.Error())
|
||||
return
|
||||
}
|
||||
if err := logic.RemoveHostByID(currentHost.ID.String()); err != nil {
|
||||
logger.Log(0, "failed to delete host: ", currentHost.ID.String(), err.Error())
|
||||
return
|
||||
}
|
||||
sendPeerUpdate = true
|
||||
}
|
||||
|
||||
if sendPeerUpdate {
|
||||
err := PublishPeerUpdate()
|
||||
if err != nil {
|
||||
logger.Log(0, "failed to pulish peer update: ", err.Error())
|
||||
}
|
||||
}
|
||||
// if servercfg.Is_EE && ifaceDelta {
|
||||
// if err = logic.EnterpriseResetAllPeersFailovers(currentHost.ID.String(), currentHost.Network); err != nil {
|
||||
// logger.Log(1, "failed to reset failover list during node update", currentHost.ID.String(), currentHost.Network)
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
||||
// UpdateMetrics message Handler -- handles updates from client nodes for metrics
|
||||
func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
|
||||
if servercfg.Is_EE {
|
||||
id, err := getID(msg.Topic())
|
||||
if err != nil {
|
||||
logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error())
|
||||
|
@ -91,209 +208,75 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
|
|||
logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error())
|
||||
return
|
||||
}
|
||||
var newNode models.Node
|
||||
if err := json.Unmarshal(decrypted, &newNode); err != nil {
|
||||
|
||||
var newMetrics models.Metrics
|
||||
if err := json.Unmarshal(decrypted, &newMetrics); err != nil {
|
||||
logger.Log(1, "error unmarshaling payload ", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
ifaceDelta := logic.IfaceDelta(¤tNode, &newNode)
|
||||
if servercfg.Is_EE && ifaceDelta {
|
||||
if err = logic.EnterpriseResetAllPeersFailovers(currentNode.ID, currentNode.Network); err != nil {
|
||||
logger.Log(1, "failed to reset failover list during node update", currentNode.ID.String(), currentNode.Network)
|
||||
}
|
||||
}
|
||||
newNode.SetLastCheckIn()
|
||||
if err := logic.UpdateNode(¤tNode, &newNode); err != nil {
|
||||
logger.Log(1, "error saving node", err.Error())
|
||||
shouldUpdate := updateNodeMetrics(¤tNode, &newMetrics)
|
||||
|
||||
if err = logic.UpdateMetrics(id, &newMetrics); err != nil {
|
||||
logger.Log(1, "faield to update node metrics", id, err.Error())
|
||||
return
|
||||
}
|
||||
if ifaceDelta { // reduce number of unneeded updates, by only sending on iface changes
|
||||
if err = PublishPeerUpdate(); err != nil {
|
||||
logger.Log(0, "error updating peers when node", currentNode.ID.String(), "informed the server of an interface change", err.Error())
|
||||
if servercfg.IsMetricsExporter() {
|
||||
if err := pushMetricsToExporter(newMetrics); err != nil {
|
||||
logger.Log(2, fmt.Sprintf("failed to push node: [%s] metrics to exporter, err: %v",
|
||||
currentNode.ID, err))
|
||||
}
|
||||
}
|
||||
|
||||
logger.Log(1, "updated node", id, newNode.ID.String())
|
||||
|
||||
}()
|
||||
}
|
||||
|
||||
// UpdateHost message Handler -- handles host updates from clients
|
||||
func UpdateHost(client mqtt.Client, msg mqtt.Message) {
|
||||
go func(msg mqtt.Message) {
|
||||
id, err := getID(msg.Topic())
|
||||
if err != nil {
|
||||
logger.Log(1, "error getting host.ID sent on ", msg.Topic(), err.Error())
|
||||
return
|
||||
}
|
||||
currentHost, err := logic.GetHost(id)
|
||||
if err != nil {
|
||||
logger.Log(1, "error getting host ", id, err.Error())
|
||||
return
|
||||
}
|
||||
decrypted, decryptErr := decryptMsgWithHost(currentHost, msg.Payload())
|
||||
if decryptErr != nil {
|
||||
logger.Log(1, "failed to decrypt message for host ", id, decryptErr.Error())
|
||||
return
|
||||
}
|
||||
var hostUpdate models.HostUpdate
|
||||
if err := json.Unmarshal(decrypted, &hostUpdate); err != nil {
|
||||
logger.Log(1, "error unmarshaling payload ", err.Error())
|
||||
return
|
||||
}
|
||||
logger.Log(3, fmt.Sprintf("recieved host update: %s\n", hostUpdate.Host.ID.String()))
|
||||
var sendPeerUpdate bool
|
||||
switch hostUpdate.Action {
|
||||
case models.Acknowledgement:
|
||||
hu := hostactions.GetAction(currentHost.ID.String())
|
||||
if hu != nil {
|
||||
if err = HostUpdate(hu); err != nil {
|
||||
logger.Log(0, "failed to send new node to host", hostUpdate.Host.Name, currentHost.ID.String(), err.Error())
|
||||
return
|
||||
} else {
|
||||
if err = PublishSingleHostPeerUpdate(currentHost, nil); err != nil {
|
||||
logger.Log(0, "failed peers publish after join acknowledged", hostUpdate.Host.Name, currentHost.ID.String(), err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
case models.UpdateHost:
|
||||
sendPeerUpdate = logic.UpdateHostFromClient(&hostUpdate.Host, currentHost)
|
||||
err := logic.UpsertHost(currentHost)
|
||||
if newMetrics.Connectivity != nil {
|
||||
err := logic.EnterpriseFailoverFunc(¤tNode)
|
||||
if err != nil {
|
||||
logger.Log(0, "failed to update host: ", currentHost.ID.String(), err.Error())
|
||||
return
|
||||
}
|
||||
case models.DeleteHost:
|
||||
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
|
||||
// delete EMQX credentials for host
|
||||
if err := DeleteEmqxUser(currentHost.ID.String()); err != nil {
|
||||
logger.Log(0, "failed to remove host credentials from EMQX: ", currentHost.ID.String(), err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
if err := logic.DisassociateAllNodesFromHost(currentHost.ID.String()); err != nil {
|
||||
logger.Log(0, "failed to delete all nodes of host: ", currentHost.ID.String(), err.Error())
|
||||
return
|
||||
}
|
||||
if err := logic.RemoveHostByID(currentHost.ID.String()); err != nil {
|
||||
logger.Log(0, "failed to delete host: ", currentHost.ID.String(), err.Error())
|
||||
return
|
||||
}
|
||||
sendPeerUpdate = true
|
||||
}
|
||||
|
||||
if sendPeerUpdate {
|
||||
err := PublishPeerUpdate()
|
||||
if err != nil {
|
||||
logger.Log(0, "failed to pulish peer update: ", err.Error())
|
||||
logger.Log(0, "failed to failover for node", currentNode.ID.String(), "on network", currentNode.Network, "-", err.Error())
|
||||
}
|
||||
}
|
||||
// if servercfg.Is_EE && ifaceDelta {
|
||||
// if err = logic.EnterpriseResetAllPeersFailovers(currentHost.ID.String(), currentHost.Network); err != nil {
|
||||
// logger.Log(1, "failed to reset failover list during node update", currentHost.ID.String(), currentHost.Network)
|
||||
// }
|
||||
// }
|
||||
|
||||
}(msg)
|
||||
}
|
||||
|
||||
// UpdateMetrics message Handler -- handles updates from client nodes for metrics
|
||||
func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
|
||||
if servercfg.Is_EE {
|
||||
go func() {
|
||||
id, err := getID(msg.Topic())
|
||||
if err != nil {
|
||||
logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error())
|
||||
return
|
||||
}
|
||||
currentNode, err := logic.GetNodeByID(id)
|
||||
if err != nil {
|
||||
logger.Log(1, "error getting node ", id, err.Error())
|
||||
return
|
||||
}
|
||||
decrypted, decryptErr := decryptMsg(¤tNode, msg.Payload())
|
||||
if decryptErr != nil {
|
||||
logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error())
|
||||
return
|
||||
}
|
||||
|
||||
var newMetrics models.Metrics
|
||||
if err := json.Unmarshal(decrypted, &newMetrics); err != nil {
|
||||
logger.Log(1, "error unmarshaling payload ", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
shouldUpdate := updateNodeMetrics(¤tNode, &newMetrics)
|
||||
|
||||
if err = logic.UpdateMetrics(id, &newMetrics); err != nil {
|
||||
logger.Log(1, "faield to update node metrics", id, err.Error())
|
||||
return
|
||||
}
|
||||
if servercfg.IsMetricsExporter() {
|
||||
if err := pushMetricsToExporter(newMetrics); err != nil {
|
||||
logger.Log(2, fmt.Sprintf("failed to push node: [%s] metrics to exporter, err: %v",
|
||||
currentNode.ID, err))
|
||||
if shouldUpdate {
|
||||
logger.Log(2, "updating peers after node", currentNode.ID.String(), currentNode.Network, "detected connectivity issues")
|
||||
host, err := logic.GetHost(currentNode.HostID.String())
|
||||
if err == nil {
|
||||
if err = PublishSingleHostPeerUpdate(host, nil); err != nil {
|
||||
logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if newMetrics.Connectivity != nil {
|
||||
err := logic.EnterpriseFailoverFunc(¤tNode)
|
||||
if err != nil {
|
||||
logger.Log(0, "failed to failover for node", currentNode.ID.String(), "on network", currentNode.Network, "-", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
if shouldUpdate {
|
||||
logger.Log(2, "updating peers after node", currentNode.ID.String(), currentNode.Network, "detected connectivity issues")
|
||||
host, err := logic.GetHost(currentNode.HostID.String())
|
||||
if err == nil {
|
||||
if err = PublishSingleHostPeerUpdate(host, nil); err != nil {
|
||||
logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.Log(1, "updated node metrics", id)
|
||||
}()
|
||||
logger.Log(1, "updated node metrics", id)
|
||||
}
|
||||
}
|
||||
|
||||
// ClientPeerUpdate message handler -- handles updating peers after signal from client nodes
|
||||
func ClientPeerUpdate(client mqtt.Client, msg mqtt.Message) {
|
||||
go func() {
|
||||
id, err := getID(msg.Topic())
|
||||
if err != nil {
|
||||
logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error())
|
||||
return
|
||||
}
|
||||
currentNode, err := logic.GetNodeByID(id)
|
||||
if err != nil {
|
||||
logger.Log(1, "error getting node ", id, err.Error())
|
||||
return
|
||||
}
|
||||
decrypted, decryptErr := decryptMsg(¤tNode, msg.Payload())
|
||||
if decryptErr != nil {
|
||||
logger.Log(1, "failed to decrypt message during client peer update for node ", id, decryptErr.Error())
|
||||
return
|
||||
}
|
||||
switch decrypted[0] {
|
||||
case ncutils.ACK:
|
||||
//do we still need this
|
||||
case ncutils.DONE:
|
||||
updateNodePeers(¤tNode)
|
||||
}
|
||||
|
||||
logger.Log(1, "sent peer updates after signal received from", id)
|
||||
}()
|
||||
}
|
||||
|
||||
func updateNodePeers(currentNode *models.Node) {
|
||||
if err := PublishPeerUpdate(); err != nil {
|
||||
logger.Log(1, "error publishing peer update ", err.Error())
|
||||
id, err := getID(msg.Topic())
|
||||
if err != nil {
|
||||
logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error())
|
||||
return
|
||||
}
|
||||
currentNode, err := logic.GetNodeByID(id)
|
||||
if err != nil {
|
||||
logger.Log(1, "error getting node ", id, err.Error())
|
||||
return
|
||||
}
|
||||
decrypted, decryptErr := decryptMsg(¤tNode, msg.Payload())
|
||||
if decryptErr != nil {
|
||||
logger.Log(1, "failed to decrypt message during client peer update for node ", id, decryptErr.Error())
|
||||
return
|
||||
}
|
||||
switch decrypted[0] {
|
||||
case ncutils.ACK:
|
||||
// do we still need this
|
||||
case ncutils.DONE:
|
||||
if err = PublishPeerUpdate(); err != nil {
|
||||
logger.Log(1, "error publishing peer update for node", currentNode.ID.String(), err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
logger.Log(1, "sent peer updates after signal received from", id)
|
||||
}
|
||||
|
||||
func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) bool {
|
||||
|
|
Loading…
Reference in a new issue