netmaker/mq/handlers.go

323 lines
10 KiB
Go
Raw Normal View History

package mq
import (
"encoding/json"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/google/uuid"
"github.com/gravitl/netmaker/database"
"github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/logic/hostactions"
"github.com/gravitl/netmaker/models"
2022-02-17 22:08:20 +08:00
"github.com/gravitl/netmaker/netclient/ncutils"
2022-09-14 03:25:56 +08:00
"github.com/gravitl/netmaker/servercfg"
"golang.org/x/exp/slog"
)
// UpdateMetrics message Handler -- handles updates from client nodes for metrics
var UpdateMetrics = func(client mqtt.Client, msg mqtt.Message) {
}
var UpdateMetricsFallBack = func(nodeid string, newMetrics models.Metrics) {}
// DefaultHandler default message queue handler -- NOT USED
func DefaultHandler(client mqtt.Client, msg mqtt.Message) {
slog.Info("mqtt default handler", "topic", msg.Topic(), "message", msg.Payload())
}
// UpdateNode message Handler -- handles updates from client nodes
func UpdateNode(client mqtt.Client, msg mqtt.Message) {
id, err := GetID(msg.Topic())
if err != nil {
slog.Error("error getting node.ID ", "topic", msg.Topic(), "error", err)
return
}
currentNode, err := logic.GetNodeByID(id)
if err != nil {
slog.Error("error getting node", "id", id, "error", err)
return
}
decrypted, decryptErr := DecryptMsg(&currentNode, msg.Payload())
if decryptErr != nil {
slog.Error("failed to decrypt message for node", "id", id, "error", decryptErr)
return
}
var newNode models.Node
if err := json.Unmarshal(decrypted, &newNode); err != nil {
slog.Error("error unmarshaling payload", "error", err)
return
}
ifaceDelta := logic.IfaceDelta(&currentNode, &newNode)
newNode.SetLastCheckIn()
if err := logic.UpdateNode(&currentNode, &newNode); err != nil {
slog.Error("error saving node", "id", id, "error", err)
return
}
if ifaceDelta { // reduce number of unneeded updates, by only sending on iface changes
if !newNode.Connected {
err = PublishDeletedNodePeerUpdate(&newNode)
host, err := logic.GetHost(newNode.HostID.String())
if err != nil {
slog.Error("failed to get host for the node", "nodeid", newNode.ID.String(), "error", err)
return
}
allNodes, err := logic.GetAllNodes()
if err == nil {
PublishSingleHostPeerUpdate(host, allNodes, nil, nil, false)
}
} else {
err = PublishPeerUpdate(false)
}
if err != nil {
slog.Warn("error updating peers when node informed the server of an interface change", "nodeid", currentNode.ID, "error", err)
}
}
slog.Info("updated node", "id", id, "newnodeid", newNode.ID)
}
// UpdateHost message Handler -- handles host updates from clients
func UpdateHost(client mqtt.Client, msg mqtt.Message) {
id, err := GetID(msg.Topic())
if err != nil {
slog.Error("error getting host.ID sent on ", "topic", msg.Topic(), "error", err)
return
}
currentHost, err := logic.GetHost(id)
if err != nil {
slog.Error("error getting host", "id", id, "error", err)
return
}
decrypted, decryptErr := decryptMsgWithHost(currentHost, msg.Payload())
if decryptErr != nil {
NET-1082: Scale Testing Fixes (#2894) * add additional mutex lock on node acls func * increase verbosity * disable acls on cloud emqx * add emqx creds creation to go routine * add debug log of mq client id * comment port check * uncomment port check * check for connection mq connection open * use username for client id * add write mutex on acl is allowed * add mq connection lost handler on server * spin off zombie init as go routine * get whole api path from config * Revert "get whole api path from config" This reverts commit 392f5f4c5f00530788f09d26a655dcbe03a06ccb. * update extclient acls async * add additional mutex lock on node acls func (cherry picked from commit 5325f0e7d7ff9411f497fdc38c980ac0c3a6847d) * increase verbosity (cherry picked from commit 705b3cf0bfbca4d7f5dccdd579875ebb00f85511) * add emqx creds creation to go routine (cherry picked from commit c8e65f4820771eb0c7c7d62b77334211c6b82adb) * add debug log of mq client id (cherry picked from commit 29c5d6cecad6fcaeb4a57bac85895d6516294e28) * comment port check (cherry picked from commit db8d6d95ead39e9f436ad4dbc4176f5ff9312863) * check for connection mq connection open (cherry picked from commit 13b11033b0795693a0a1a0a225db50b8a5c001ae) * use username for client id (cherry picked from commit e90c7386dea48b560c9060289f1db06f2a8d77c1) * add write mutex on acl is allowed (cherry picked from commit 4cae1b0bb4b4b608fdb76b3ef49c7fd7390c9ccb) * add mq connection lost handler on server (cherry picked from commit c82918ad3564098487d5ac4a223ee5d95e76ac3e) * spin off zombie init as go routine (cherry picked from commit 6d65c44c4375ff7a292c05d2becf6507e7310837) * update extclient acls async (cherry picked from commit 6557ef1ebe87ec7c74e9038a99185616a2b87e89) * additionl logs for oauth user flow (cherry picked from commit 61703038ae3227de6f706e1d1d9a9362e4c258d5) * add more debug logs (cherry picked from commit 5980beacd10e7efa1cfbc12e6c048452d9ceca82) * add more debug logs (cherry picked from commit 4d001f0d2709fcc55ae66e2591a0bd079207a61d) * add set auth secret (cherry picked from commit f41cef5da51a884e68d12ac90ae627428b02b112) * fix fetch pass (cherry picked from commit 825caf4b600133f8365f5ab71e0ecd69cc8a996b) * make sure auth secret is set only once (cherry picked from commit ba33ed02aa52237126904f4bbe17b53dd595d7cf) * make sure auth secret is set only once (cherry picked from commit 920ac4c5073fb2c8805520d37abddffb062896ae) * comment usage of emqx acls * replace read lock with write lock on acls * replace read lock with write lock on acls (cherry picked from commit 808d2135c80461edfa96b61070e09ad136fa4644) * use deadlock pkg for visibility * add additional mutex locks * remove race flag * on mq re-connecting donot exit if failed * on mq re-connecting donot exit if failed * revert mutex package change * set mq clean session * remove debug log * go mod tidy * revert on prem emqx acls del
2024-04-11 23:48:57 +08:00
slog.Error("failed to decrypt message for host", "id", id, "name", currentHost.Name, "error", decryptErr)
return
}
var hostUpdate models.HostUpdate
if err := json.Unmarshal(decrypted, &hostUpdate); err != nil {
slog.Error("error unmarshaling payload", "error", err)
return
}
slog.Info("recieved host update", "name", hostUpdate.Host.Name, "id", hostUpdate.Host.ID)
var sendPeerUpdate bool
var replacePeers bool
switch hostUpdate.Action {
2023-03-18 03:58:06 +08:00
case models.CheckIn:
sendPeerUpdate = HandleHostCheckin(&hostUpdate.Host, currentHost)
case models.Acknowledgement:
hu := hostactions.GetAction(currentHost.ID.String())
if hu != nil {
if err = HostUpdate(hu); err != nil {
slog.Error("failed to send new node to host", "name", hostUpdate.Host.Name, "id", currentHost.ID, "error", err)
return
} else {
2023-03-21 20:25:51 +08:00
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
if err = emqx.AppendNodeUpdateACL(hu.Host.ID.String(), hu.Node.Network, hu.Node.ID.String(), servercfg.GetServer()); err != nil {
slog.Error("failed to add ACLs for EMQX node", "error", err)
2023-03-21 20:25:51 +08:00
return
}
}
2023-06-28 23:35:21 +08:00
nodes, err := logic.GetAllNodes()
if err != nil {
return
}
if err = PublishSingleHostPeerUpdate(currentHost, nodes, nil, nil, false); err != nil {
slog.Error("failed peers publish after join acknowledged", "name", hostUpdate.Host.Name, "id", currentHost.ID, "error", err)
return
}
}
}
case models.UpdateHost:
2023-04-18 06:20:09 +08:00
if hostUpdate.Host.PublicKey != currentHost.PublicKey {
//remove old peer entry
replacePeers = true
2023-04-18 06:20:09 +08:00
}
sendPeerUpdate = logic.UpdateHostFromClient(&hostUpdate.Host, currentHost)
err := logic.UpsertHost(currentHost)
if err != nil {
slog.Error("failed to update host", "id", currentHost.ID, "error", err)
return
}
case models.DeleteHost:
if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
// delete EMQX credentials for host
if err := emqx.DeleteEmqxUser(currentHost.ID.String()); err != nil {
slog.Error("failed to remove host credentials from EMQX", "id", currentHost.ID, "error", err)
}
2022-09-08 18:19:10 +08:00
}
// notify of deleted peer change
go func(host models.Host) {
for _, nodeID := range host.Nodes {
node, err := logic.GetNodeByID(nodeID)
if err == nil {
var gwClients []models.ExtClient
if node.IsIngressGateway {
gwClients = logic.GetGwExtclients(node.ID.String(), node.Network)
}
go PublishMqUpdatesForDeletedNode(node, false, gwClients)
}
}
}(*currentHost)
if err := logic.DisassociateAllNodesFromHost(currentHost.ID.String()); err != nil {
slog.Error("failed to delete all nodes of host", "id", currentHost.ID, "error", err)
return
}
if err := logic.RemoveHostByID(currentHost.ID.String()); err != nil {
slog.Error("failed to delete host", "id", currentHost.ID, "error", err)
return
}
if servercfg.IsDNSMode() {
logic.SetDNS()
}
sendPeerUpdate = true
case models.SignalHost:
signalPeer(hostUpdate.Signal)
}
if sendPeerUpdate {
err := PublishPeerUpdate(replacePeers)
if err != nil {
slog.Error("failed to publish peer update", "error", err)
}
}
2022-09-14 03:25:56 +08:00
}
func signalPeer(signal models.Signal) {
if signal.ToHostPubKey == "" {
msg := "insufficient data to signal peer"
logger.Log(0, msg)
return
}
signal.IsPro = servercfg.IsPro
peerHost, err := logic.GetHost(signal.ToHostID)
if err != nil {
slog.Error("failed to signal, peer host not found", "error", err)
return
}
peerNode, err := logic.GetNodeByID(signal.ToNodeID)
if err != nil {
slog.Error("failed to signal, node not found", "error", err)
return
}
node, err := logic.GetNodeByID(signal.FromNodeID)
if err != nil {
slog.Error("failed to signal, peer node not found", "error", err)
return
}
if peerNode.IsIngressGateway || node.IsIngressGateway || peerNode.IsInternetGateway || node.IsInternetGateway {
signal.Action = ""
return
}
err = HostUpdate(&models.HostUpdate{
Action: models.SignalHost,
Host: *peerHost,
Signal: signal,
})
if err != nil {
slog.Error("failed to publish signal to peer", "error", err)
}
}
// ClientPeerUpdate message handler -- handles updating peers after signal from client nodes
func ClientPeerUpdate(client mqtt.Client, msg mqtt.Message) {
id, err := GetID(msg.Topic())
if err != nil {
slog.Error("error getting node.ID sent on ", "topic", msg.Topic(), "error", err)
return
}
currentNode, err := logic.GetNodeByID(id)
if err != nil {
slog.Error("error getting node", "id", id, "error", err)
return
}
decrypted, decryptErr := DecryptMsg(&currentNode, msg.Payload())
if decryptErr != nil {
slog.Error("failed to decrypt message for node", "id", id, "error", decryptErr)
return
}
switch decrypted[0] {
case ncutils.ACK:
// do we still need this
case ncutils.DONE:
if err = PublishPeerUpdate(false); err != nil {
slog.Error("error publishing peer update for node", "id", currentNode.ID, "error", err)
return
}
}
slog.Info("sent peer updates after signal received from", "id", id)
}
2022-09-14 03:25:56 +08:00
func HandleHostCheckin(h, currentHost *models.Host) bool {
2023-03-18 03:58:06 +08:00
if h == nil {
return false
}
for i := range currentHost.Nodes {
currNodeID := currentHost.Nodes[i]
node, err := logic.GetNodeByID(currNodeID)
if err != nil {
if database.IsEmptyRecord(err) {
fakeNode := models.Node{}
fakeNode.ID, _ = uuid.Parse(currNodeID)
fakeNode.Action = models.NODE_DELETE
fakeNode.PendingDelete = true
if err := NodeUpdate(&fakeNode); err != nil {
slog.Warn("failed to inform host to remove node", "host", currentHost.Name, "hostid", currentHost.ID, "nodeid", currNodeID, "error", err)
2023-03-18 03:58:06 +08:00
}
}
continue
}
2023-03-22 05:47:15 +08:00
if err := logic.UpdateNodeCheckin(&node); err != nil {
slog.Warn("failed to update node on checkin", "nodeid", node.ID, "error", err)
2023-03-18 03:58:06 +08:00
}
}
for i := range h.Interfaces {
h.Interfaces[i].AddressString = h.Interfaces[i].Address.String()
}
/// version or firewall in use change does not require a peerUpdate
if h.Version != currentHost.Version || h.FirewallInUse != currentHost.FirewallInUse {
currentHost.FirewallInUse = h.FirewallInUse
currentHost.Version = h.Version
if err := logic.UpsertHost(currentHost); err != nil {
slog.Error("failed to update host after check-in", "name", h.Name, "id", h.ID, "error", err)
return false
}
}
2023-03-24 03:37:11 +08:00
ifaceDelta := len(h.Interfaces) != len(currentHost.Interfaces) ||
!h.EndpointIP.Equal(currentHost.EndpointIP) ||
(len(h.NatType) > 0 && h.NatType != currentHost.NatType) ||
2023-06-29 00:40:25 +08:00
h.DefaultInterface != currentHost.DefaultInterface ||
(h.ListenPort != 0 && h.ListenPort != currentHost.ListenPort) ||
(h.WgPublicListenPort != 0 && h.WgPublicListenPort != currentHost.WgPublicListenPort) || (!h.EndpointIPv6.Equal(currentHost.EndpointIPv6))
2023-03-24 03:37:11 +08:00
if ifaceDelta { // only save if something changes
currentHost.EndpointIP = h.EndpointIP
currentHost.EndpointIPv6 = h.EndpointIPv6
2023-03-24 03:37:11 +08:00
currentHost.Interfaces = h.Interfaces
currentHost.DefaultInterface = h.DefaultInterface
currentHost.NatType = h.NatType
2023-06-29 00:40:25 +08:00
if h.ListenPort != 0 {
currentHost.ListenPort = h.ListenPort
}
if h.WgPublicListenPort != 0 {
currentHost.WgPublicListenPort = h.WgPublicListenPort
}
2023-03-24 03:37:11 +08:00
if err := logic.UpsertHost(currentHost); err != nil {
slog.Error("failed to update host after check-in", "name", h.Name, "id", h.ID, "error", err)
2023-03-24 03:37:11 +08:00
return false
}
slog.Info("updated host after check-in", "name", currentHost.Name, "id", currentHost.ID)
2023-03-18 03:58:06 +08:00
}
slog.Info("check-in processed for host", "name", h.Name, "id", h.ID)
2023-03-22 05:47:15 +08:00
return ifaceDelta
2023-03-18 03:58:06 +08:00
}