NET-427: Peer Update fixes (#2445)

* send delete peer update always

* fix add/remove host api calls

* keep mq updates in a single go func
This commit is contained in:
Abhishek K 2023-07-11 13:00:09 +05:30 committed by GitHub
parent 184011f1f2
commit 7121f370c6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 41 additions and 22 deletions

View file

@ -9,7 +9,6 @@ import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/logic/hostactions"
"github.com/gravitl/netmaker/models" "github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/mq" "github.com/gravitl/netmaker/mq"
"github.com/gravitl/netmaker/servercfg" "github.com/gravitl/netmaker/servercfg"
@ -261,18 +260,14 @@ func addHostToNetwork(w http.ResponseWriter, r *http.Request) {
return return
} }
logger.Log(1, "added new node", newNode.ID.String(), "to host", currHost.Name) logger.Log(1, "added new node", newNode.ID.String(), "to host", currHost.Name)
hostactions.AddAction(models.HostUpdate{ go func() {
Action: models.JoinHostToNetwork,
Host: *currHost,
Node: *newNode,
})
if servercfg.IsMessageQueueBackend() {
mq.HostUpdate(&models.HostUpdate{ mq.HostUpdate(&models.HostUpdate{
Action: models.RequestAck, Action: models.JoinHostToNetwork,
Host: *currHost, Host: *currHost,
Node: *newNode,
}) })
} mq.PublishPeerUpdate()
}()
logger.Log(2, r.Header.Get("user"), fmt.Sprintf("added host %s to network %s", currHost.Name, network)) logger.Log(2, r.Header.Get("user"), fmt.Sprintf("added host %s to network %s", currHost.Name, network))
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
} }
@ -312,6 +307,25 @@ func deleteHostFromNetwork(w http.ResponseWriter, r *http.Request) {
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal")) logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
return return
} }
if node.IsRelayed {
// cleanup node from relayednodes on relay node
relayNode, err := logic.GetNodeByID(node.RelayedBy)
if err == nil {
relayedNodes := []string{}
for _, relayedNodeID := range relayNode.RelayedNodes {
if relayedNodeID == node.ID.String() {
continue
}
relayedNodes = append(relayedNodes, relayedNodeID)
}
relayNode.RelayedNodes = relayedNodes
logic.UpsertNode(&relayNode)
}
}
if node.IsRelay {
// unset all the relayed nodes
logic.SetRelayedNodes(false, node.ID.String(), node.RelayedNodes)
}
node.Action = models.NODE_DELETE node.Action = models.NODE_DELETE
node.PendingDelete = true node.PendingDelete = true
logger.Log(1, "deleting node", node.ID.String(), "from host", currHost.Name) logger.Log(1, "deleting node", node.ID.String(), "from host", currHost.Name)
@ -320,10 +334,10 @@ func deleteHostFromNetwork(w http.ResponseWriter, r *http.Request) {
return return
} }
// notify node change // notify node change
runUpdates(node, false) runUpdates(node, false)
go func() { // notify of peer change go func() { // notify of peer change
if err := mq.PublishPeerUpdate(); err != nil { err = mq.PublishDeletedNodePeerUpdate(node)
if err != nil {
logger.Log(1, "error publishing peer update ", err.Error()) logger.Log(1, "error publishing peer update ", err.Error())
} }
if err := mq.PublishDNSDelete(node, currHost); err != nil { if err := mq.PublishDNSDelete(node, currHost); err != nil {

View file

@ -755,7 +755,6 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
relayNode.RelayedNodes = relayedNodes relayNode.RelayedNodes = relayedNodes
logic.UpsertNode(&relayNode) logic.UpsertNode(&relayNode)
} }
} }
if node.IsRelay { if node.IsRelay {
// unset all the relayed nodes // unset all the relayed nodes
@ -772,17 +771,12 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
if !fromNode { // notify node change if !fromNode { // notify node change
runUpdates(&node, false) runUpdates(&node, false)
} }
go func(deletedNode *models.Node, fromNode bool) { // notify of peer change go func() { // notify of peer change
var err error var err error
if fromNode { err = mq.PublishDeletedNodePeerUpdate(&node)
err = mq.PublishDeletedNodePeerUpdate(deletedNode)
} else {
err = mq.PublishPeerUpdate()
}
if err != nil { if err != nil {
logger.Log(1, "error publishing peer update ", err.Error()) logger.Log(1, "error publishing peer update ", err.Error())
} }
host, err := logic.GetHost(node.HostID.String()) host, err := logic.GetHost(node.HostID.String())
if err != nil { if err != nil {
logger.Log(1, "failed to retrieve host for node", node.ID.String(), err.Error()) logger.Log(1, "failed to retrieve host for node", node.ID.String(), err.Error())
@ -790,7 +784,7 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
if err := mq.PublishDNSDelete(&node, host); err != nil { if err := mq.PublishDNSDelete(&node, host); err != nil {
logger.Log(1, "error publishing dns update", err.Error()) logger.Log(1, "error publishing dns update", err.Error())
} }
}(&node, fromNode) }()
} }
func runUpdates(node *models.Node, ifaceDelta bool) { func runUpdates(node *models.Node, ifaceDelta bool) {

View file

@ -312,7 +312,6 @@ func GetPeerUpdateForHost(network string, host *models.Host, allNodes []models.N
} }
hostPeerUpdate.NodePeers = append(hostPeerUpdate.NodePeers, nodePeer) hostPeerUpdate.NodePeers = append(hostPeerUpdate.NodePeers, nodePeer)
} }
//}
} }
var extPeers []wgtypes.PeerConfig var extPeers []wgtypes.PeerConfig
var extPeerIDAndAddrs []models.IDandAddr var extPeerIDAndAddrs []models.IDandAddr
@ -387,6 +386,18 @@ func GetPeerUpdateForHost(network string, host *models.Host, allNodes []models.N
} }
hostPeerUpdate.Peers[i] = peer hostPeerUpdate.Peers[i] = peer
} }
if deletedNode != nil {
peerHost, err := GetHost(deletedNode.HostID.String())
if err == nil && host.ID != peerHost.ID {
if _, ok := peerIndexMap[peerHost.PublicKey.String()]; !ok {
hostPeerUpdate.Peers = append(hostPeerUpdate.Peers, wgtypes.PeerConfig{
PublicKey: peerHost.PublicKey,
Remove: true,
})
}
}
}
for i := range hostPeerUpdate.NodePeers { for i := range hostPeerUpdate.NodePeers {
peer := hostPeerUpdate.NodePeers[i] peer := hostPeerUpdate.NodePeers[i]