refacted running node updates

This commit is contained in:
0xdcarns 2022-02-16 20:42:57 -05:00
parent f1b5518bf3
commit 922ebcb4ab
6 changed files with 90 additions and 117 deletions

View file

@ -3,7 +3,6 @@ package controller
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"net/http" "net/http"
"strings" "strings"
@ -12,7 +11,6 @@ import (
"github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/models" "github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/mq"
"github.com/gravitl/netmaker/servercfg" "github.com/gravitl/netmaker/servercfg"
) )
@ -109,17 +107,9 @@ func keyUpdate(w http.ResponseWriter, r *http.Request) {
return return
} }
for _, node := range nodes { for _, node := range nodes {
fmt.Println("updating node ", node.Name, " for a key update") logger.Log(3, "updating node ", node.Name, " for a key update")
if err := mq.NodeUpdate(&node); err != nil { runUpdates(&node, true)
logger.Log(2, "failed key update ", node.Name)
}
} }
node, err := logic.GetNetworkServerLeader(netname)
if err != nil {
logger.Log(2, "failed to get server node")
return
}
runUpdates(&node, false, false)
} }
// Update a network // Update a network
@ -184,7 +174,7 @@ func updateNetwork(w http.ResponseWriter, r *http.Request) {
return return
} }
for _, node := range nodes { for _, node := range nodes {
runUpdates(&node, true, false) runUpdates(&node, true)
} }
} }

View file

@ -2,7 +2,6 @@ package controller
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"net/http" "net/http"
"strings" "strings"
@ -396,8 +395,8 @@ func createNode(w http.ResponseWriter, r *http.Request) {
validKey := logic.IsKeyValid(networkName, node.AccessKey) validKey := logic.IsKeyValid(networkName, node.AccessKey)
if !validKey { if !validKey {
//Check to see if network will allow manual sign up // Check to see if network will allow manual sign up
//may want to switch this up with the valid key check and avoid a DB call that way. // may want to switch this up with the valid key check and avoid a DB call that way.
if network.AllowManualSignUp == "yes" { if network.AllowManualSignUp == "yes" {
node.IsPending = "yes" node.IsPending = "yes"
} else { } else {
@ -420,8 +419,8 @@ func createNode(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(node) json.NewEncoder(w).Encode(node)
} }
//Takes node out of pending state // Takes node out of pending state
//TODO: May want to use cordon/uncordon terminology instead of "ispending". // TODO: May want to use cordon/uncordon terminology instead of "ispending".
func uncordonNode(w http.ResponseWriter, r *http.Request) { func uncordonNode(w http.ResponseWriter, r *http.Request) {
var params = mux.Vars(r) var params = mux.Vars(r)
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
@ -438,6 +437,8 @@ func uncordonNode(w http.ResponseWriter, r *http.Request) {
mq.NodeUpdate(&node) mq.NodeUpdate(&node)
} }
// == EGRESS ==
func createEgressGateway(w http.ResponseWriter, r *http.Request) { func createEgressGateway(w http.ResponseWriter, r *http.Request) {
var gateway models.EgressGatewayRequest var gateway models.EgressGatewayRequest
var params = mux.Vars(r) var params = mux.Vars(r)
@ -459,7 +460,7 @@ func createEgressGateway(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(node) json.NewEncoder(w).Encode(node)
runUpdates(&node, true, false) runUpdates(&node, true)
} }
func deleteEgressGateway(w http.ResponseWriter, r *http.Request) { func deleteEgressGateway(w http.ResponseWriter, r *http.Request) {
@ -477,7 +478,7 @@ func deleteEgressGateway(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(node) json.NewEncoder(w).Encode(node)
runUpdates(&node, true, false) runUpdates(&node, true)
} }
// == INGRESS == // == INGRESS ==
@ -497,7 +498,7 @@ func createIngressGateway(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(node) json.NewEncoder(w).Encode(node)
runUpdates(&node, true, false) runUpdates(&node, true)
} }
func deleteIngressGateway(w http.ResponseWriter, r *http.Request) { func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
@ -514,7 +515,7 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(node) json.NewEncoder(w).Encode(node)
runUpdates(&node, true, false) runUpdates(&node, true)
} }
func updateNode(w http.ResponseWriter, r *http.Request) { func updateNode(w http.ResponseWriter, r *http.Request) {
@ -570,10 +571,7 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
} }
if len(updatenodes) > 0 { if len(updatenodes) > 0 {
for _, relayedNode := range updatenodes { for _, relayedNode := range updatenodes {
err = mq.NodeUpdate(&relayedNode) runUpdates(&relayedNode, false)
if err != nil {
logger.Log(1, "error sending update to relayed node ", relayedNode.Address, "on network", node.Network, ": ", err.Error())
}
} }
} }
} }
@ -586,18 +584,7 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(newNode) json.NewEncoder(w).Encode(newNode)
if isServer(&node) { runUpdates(&newNode, ifaceDelta)
if ifaceDelta {
if err := mq.PublishPeerUpdate(&node); err != nil {
logger.Log(1, "failed to publish peer update "+err.Error())
}
}
if err := logic.UpdateNode(&node, &newNode); err != nil {
logger.Log(1, "error updating server node "+err.Error())
}
} else {
runUpdates(&newNode)
}
} }
func deleteNode(w http.ResponseWriter, r *http.Request) { func deleteNode(w http.ResponseWriter, r *http.Request) {
@ -632,17 +619,48 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
returnSuccessResponse(w, r, nodeid+" deleted.") returnSuccessResponse(w, r, nodeid+" deleted.")
logger.Log(1, r.Header.Get("user"), "Deleted node", nodeid, "from network", params["network"]) logger.Log(1, r.Header.Get("user"), "Deleted node", nodeid, "from network", params["network"])
//:w
runUpdates(&node, false) runUpdates(&node, false)
} }
func runUpdates(node *models.Node) error { func runUpdates(node *models.Node, ifaceDelta bool) {
//don't publish to server node go func() { // don't block http response
if isServer(node) { if isServer(node) { // don't publish to server node
return errors.New("update to server node not permited") if err := runServerUpdate(node, ifaceDelta); err != nil {
logger.Log(1, "error running server update", err.Error())
}
} else { // publish node update if not server
if err := mq.NodeUpdate(node); err != nil {
logger.Log(1, "error publishing node update to node", node.Name, node.ID, err.Error())
}
}
}()
}
// updates local peers for a server on a given node's network
func runServerUpdate(serverNode *models.Node, ifaceDelta bool) error {
err := logic.TimerCheckpoint()
if err != nil {
logger.Log(3, "error occurred on timer,", err.Error())
} }
if err := mq.NodeUpdate(node); err != nil {
logger.Log(1, "error publishing node update", err.Error()) if servercfg.IsClientMode() != "on" {
return nil
}
if ifaceDelta {
if err := mq.PublishPeerUpdate(serverNode); err != nil {
logger.Log(1, "failed to publish peer update "+err.Error())
}
}
var currentServerNode, getErr = logic.GetNetworkServerLeader(serverNode.Network)
if err != nil {
return getErr
}
if err = logic.ServerUpdate(&currentServerNode, ifaceDelta); err != nil {
logger.Log(1, "server node:", currentServerNode.ID, "failed update")
return err return err
} }
return nil return nil

View file

@ -13,6 +13,7 @@ import (
"github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/models" "github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/mq" "github.com/gravitl/netmaker/mq"
"github.com/gravitl/netmaker/servercfg"
"github.com/gravitl/netmaker/serverctl" "github.com/gravitl/netmaker/serverctl"
) )
@ -106,8 +107,6 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object)
Type: nodepb.NODE_TYPE, Type: nodepb.NODE_TYPE,
} }
//runUpdates(&node, false, false)
go func(node *models.Node) { go func(node *models.Node) {
if node.UDPHolePunch == "yes" { if node.UDPHolePunch == "yes" {
var currentServerNode, getErr = logic.GetNetworkServerLeader(node.Network) var currentServerNode, getErr = logic.GetNetworkServerLeader(node.Network)
@ -133,47 +132,44 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object)
} }
// NodeServiceServer.UpdateNode updates a node and responds over gRPC // NodeServiceServer.UpdateNode updates a node and responds over gRPC
// func (s *NodeServiceServer) UpdateNode(ctx context.Context, req *nodepb.Object) (*nodepb.Object, error) { // DELETE ONE DAY - DEPRECATED
func (s *NodeServiceServer) UpdateNode(ctx context.Context, req *nodepb.Object) (*nodepb.Object, error) {
// var newnode models.Node var newnode models.Node
// if err := json.Unmarshal([]byte(req.GetData()), &newnode); err != nil { if err := json.Unmarshal([]byte(req.GetData()), &newnode); err != nil {
// return nil, err return nil, err
// } }
// node, err := logic.GetNodeByID(newnode.ID) node, err := logic.GetNodeByID(newnode.ID)
// if err != nil { if err != nil {
// return nil, err return nil, err
// } }
// ifaceDelta := logic.IfaceDelta(&node, &newnode) if !servercfg.GetRce() {
newnode.PostDown = node.PostDown
newnode.PostUp = node.PostUp
}
// if !servercfg.GetRce() { err = logic.UpdateNode(&node, &newnode)
// newnode.PostDown = node.PostDown if err != nil {
// newnode.PostUp = node.PostUp return nil, err
// } }
newnode.NetworkSettings, err = logic.GetNetworkSettings(node.Network)
if err != nil {
return nil, err
}
getServerAddrs(&newnode)
// err = logic.UpdateNode(&node, &newnode) nodeData, errN := json.Marshal(&newnode)
// if err != nil { if errN != nil {
// return nil, err return nil, err
// } }
// newnode.NetworkSettings, err = logic.GetNetworkSettings(node.Network)
// if err != nil {
// return nil, err
// }
// getServerAddrs(&newnode)
// nodeData, errN := json.Marshal(&newnode) return &nodepb.Object{
// if errN != nil { Data: string(nodeData),
// return nil, err Type: nodepb.NODE_TYPE,
// } }, nil
}
// runUpdates(&newnode, false, ifaceDelta)
// return &nodepb.Object{
// Data: string(nodeData),
// Type: nodepb.NODE_TYPE,
// }, nil
// }
func getServerAddrs(node *models.Node) { func getServerAddrs(node *models.Node) {
serverNodes := logic.GetServerNodes(node.Network) serverNodes := logic.GetServerNodes(node.Network)
@ -218,7 +214,7 @@ func (s *NodeServiceServer) DeleteNode(ctx context.Context, req *nodepb.Object)
return nil, err return nil, err
} }
runServerPeerUpdate(&node, false) runUpdates(&node, false)
return &nodepb.Object{ return &nodepb.Object{
Data: "success", Data: "success",

View file

@ -36,7 +36,7 @@ func createRelay(w http.ResponseWriter, r *http.Request) {
} }
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(node) json.NewEncoder(w).Encode(node)
runUpdates(&node, true, false) runUpdates(&node, true)
} }
func deleteRelay(w http.ResponseWriter, r *http.Request) { func deleteRelay(w http.ResponseWriter, r *http.Request) {
@ -58,5 +58,5 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) {
} }
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(node) json.NewEncoder(w).Encode(node)
runUpdates(&node, true, false) runUpdates(&node, true)
} }

View file

@ -1,31 +0,0 @@
package controller
import (
"github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/servercfg"
)
// updates local peers for a server on a given node's network
func runServerUpdate(node *models.Node, ifaceDelta bool) error {
err := logic.TimerCheckpoint()
if err != nil {
logger.Log(3, "error occurred on timer,", err.Error())
}
if servercfg.IsClientMode() != "on" {
return nil
}
var currentServerNode, getErr = logic.GetNetworkServerLeader(node.Network)
if err != nil {
return getErr
}
if err = logic.ServerUpdate(&currentServerNode, ifaceDelta); err != nil {
logger.Log(1, "server node:", currentServerNode.ID, "failed update")
return err
}
return nil
}

View file

@ -71,7 +71,7 @@ func PublishExtPeerUpdate(node *models.Node) error {
// NodeUpdate -- publishes a node update // NodeUpdate -- publishes a node update
func NodeUpdate(node *models.Node) error { func NodeUpdate(node *models.Node) error {
if !servercfg.IsMessageQueueBackend() { if !servercfg.IsMessageQueueBackend() || node.IsServer == "yes" {
return nil return nil
} }
logger.Log(3, "publishing node update to "+node.Name) logger.Log(3, "publishing node update to "+node.Name)