From bf687dd478592cd34f3c9fd0233fed1bc8131b18 Mon Sep 17 00:00:00 2001 From: 0xdcarns Date: Fri, 17 Mar 2023 15:58:06 -0400 Subject: [PATCH] add return --- models/host.go | 2 + mq/handlers.go | 130 ++++++++++++++++--------------------------------- mq/mq.go | 4 -- 3 files changed, 44 insertions(+), 92 deletions(-) diff --git a/models/host.go b/models/host.go index 4b2231b6..13e44b50 100644 --- a/models/host.go +++ b/models/host.go @@ -94,6 +94,8 @@ const ( Acknowledgement = "ACK" // RequestAck - request an ACK RequestAck = "REQ_ACK" + // CheckIn - update last check in times and public address and interfaces + CheckIn = "CHECK_IN" ) // HostUpdate - struct for host update diff --git a/mq/handlers.go b/mq/handlers.go index 2d07fd6e..4b53743c 100644 --- a/mq/handlers.go +++ b/mq/handlers.go @@ -23,94 +23,6 @@ func DefaultHandler(client mqtt.Client, msg mqtt.Message) { logger.Log(0, "MQTT Message: Topic: ", string(msg.Topic()), " Message: ", string(msg.Payload())) } -// Ping message Handler -- handles ping topic from client nodes -func Ping(client mqtt.Client, msg mqtt.Message) { - id, err := getID(msg.Topic()) - if err != nil { - logger.Log(0, "error getting node.ID sent on ping topic ") - return - } - node, err := logic.GetNodeByID(id) - if err != nil { - logger.Log(3, "mq-ping error getting node: ", err.Error()) - node, err := logic.GetNodeByID(id) - if err != nil { - logger.Log(3, "mq-ping error getting node: ", err.Error()) - if database.IsEmptyRecord(err) { - h := logic.GetHostByNodeID(id) // check if a host is still associated - if h != nil { // inform host that node should be removed - fakeNode := models.Node{} - fakeNode.ID, _ = uuid.Parse(id) - fakeNode.Action = models.NODE_DELETE - fakeNode.PendingDelete = true - if err := NodeUpdate(&fakeNode); err != nil { - logger.Log(0, "failed to inform host", h.Name, h.ID.String(), "to remove node", id, err.Error()) - } - } - } - return - } - decrypted, decryptErr := decryptMsg(&node, msg.Payload()) - if decryptErr != nil { - logger.Log(0, "error decrypting when updating node ", node.ID.String(), decryptErr.Error()) - return - } - var checkin models.NodeCheckin - if err := json.Unmarshal(decrypted, &checkin); err != nil { - logger.Log(1, "error unmarshaling payload ", err.Error()) - return - } - host, err := logic.GetHost(node.HostID.String()) - if err != nil { - logger.Log(0, "error retrieving host for node ", node.ID.String(), err.Error()) - return - } - node.SetLastCheckIn() - host.Version = checkin.Version - node.Connected = checkin.Connected - host.Interfaces = checkin.Ifaces - for i := range host.Interfaces { - host.Interfaces[i].AddressString = host.Interfaces[i].Address.String() - } - if err := logic.UpdateNode(&node, &node); err != nil { - logger.Log(0, "error updating node", node.ID.String(), " on checkin", err.Error()) - return - } - - return - } - decrypted, decryptErr := decryptMsg(&node, msg.Payload()) - if decryptErr != nil { - logger.Log(0, "error decrypting when updating node ", node.ID.String(), decryptErr.Error()) - return - } - var checkin models.NodeCheckin - if err := json.Unmarshal(decrypted, &checkin); err != nil { - logger.Log(1, "error unmarshaling payload ", err.Error()) - return - } - host, err := logic.GetHost(node.HostID.String()) - if err != nil { - logger.Log(0, "error retrieving host for node ", node.ID.String(), err.Error()) - return - } - node.SetLastCheckIn() - host.Version = checkin.Version - node.Connected = checkin.Connected - host.Interfaces = checkin.Ifaces - for i := range host.Interfaces { - host.Interfaces[i].AddressString = host.Interfaces[i].Address.String() - } - if err := logic.UpdateNode(&node, &node); err != nil { - logger.Log(0, "error updating node", node.ID.String(), " on checkin", err.Error()) - return - } - - logger.Log(3, "ping processed for node", node.ID.String()) - // --TODO --set client version once feature is implemented. - //node.SetClientVersion(msg.Payload()) -} - // UpdateNode message Handler -- handles updates from client nodes func UpdateNode(client mqtt.Client, msg mqtt.Message) { id, err := getID(msg.Topic()) @@ -179,6 +91,8 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) { logger.Log(3, fmt.Sprintf("recieved host update: %s\n", hostUpdate.Host.ID.String())) var sendPeerUpdate bool switch hostUpdate.Action { + case models.CheckIn: + sendPeerUpdate = handleHostCheckin(&hostUpdate.Host, currentHost) case models.Acknowledgement: hu := hostactions.GetAction(currentHost.ID.String()) if hu != nil { @@ -447,3 +361,43 @@ func handleNewNodeDNS(host *models.Host, node *models.Node) error { } return nil } + +func handleHostCheckin(h, currentHost *models.Host) bool { + if h == nil { + return false + } + + for i := range currentHost.Nodes { + currNodeID := currentHost.Nodes[i] + node, err := logic.GetNodeByID(currNodeID) + if err != nil { + if database.IsEmptyRecord(err) { + fakeNode := models.Node{} + fakeNode.ID, _ = uuid.Parse(currNodeID) + fakeNode.Action = models.NODE_DELETE + fakeNode.PendingDelete = true + if err := NodeUpdate(&fakeNode); err != nil { + logger.Log(0, "failed to inform host", currentHost.Name, currentHost.ID.String(), "to remove node", currNodeID, err.Error()) + } + } + continue + } + node.SetLastCheckIn() + if err := logic.UpdateNode(&node, &node); err != nil { + logger.Log(0, "error updating node", node.ID.String(), " on checkin", err.Error()) + } + } + + for i := range h.Interfaces { + h.Interfaces[i].AddressString = h.Interfaces[i].Address.String() + } + h.HostPass = currentHost.HostPass + if err := logic.UpsertHost(h); err != nil { + logger.Log(0, "failed to update host after check-in", h.Name, h.ID.String(), err.Error()) + return false + } + + logger.Log(3, "ping processed for host", h.Name, h.ID.String()) + return len(h.Interfaces) != len(currentHost.Interfaces) || + !h.EndpointIP.Equal(currentHost.EndpointIP) +} diff --git a/mq/mq.go b/mq/mq.go index b8e0d458..fe04fce6 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -53,10 +53,6 @@ func SetupMQTT() { opts := mqtt.NewClientOptions() setMqOptions(servercfg.GetMqUserName(), servercfg.GetMqPassword(), opts) opts.SetOnConnectHandler(func(client mqtt.Client) { - if token := client.Subscribe("ping/#", 2, mqtt.MessageHandler(Ping)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { - client.Disconnect(240) - logger.Log(0, "ping subscription failed") - } if token := client.Subscribe("update/#", 0, mqtt.MessageHandler(UpdateNode)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { client.Disconnect(240) logger.Log(0, "node update subscription failed")