From 922ebcb4abf51403f04535c0f731a4e513e83bf4 Mon Sep 17 00:00:00 2001 From: 0xdcarns Date: Wed, 16 Feb 2022 20:42:57 -0500 Subject: [PATCH] refacted running node updates --- controllers/network.go | 16 ++------ controllers/node.go | 82 +++++++++++++++++++++++--------------- controllers/node_grpc.go | 72 ++++++++++++++++----------------- controllers/relay.go | 4 +- controllers/server_util.go | 31 -------------- mq/publishers.go | 2 +- 6 files changed, 90 insertions(+), 117 deletions(-) delete mode 100644 controllers/server_util.go diff --git a/controllers/network.go b/controllers/network.go index 95683061..088d064c 100644 --- a/controllers/network.go +++ b/controllers/network.go @@ -3,7 +3,6 @@ package controller import ( "encoding/json" "errors" - "fmt" "net/http" "strings" @@ -12,7 +11,6 @@ import ( "github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/models" - "github.com/gravitl/netmaker/mq" "github.com/gravitl/netmaker/servercfg" ) @@ -109,17 +107,9 @@ func keyUpdate(w http.ResponseWriter, r *http.Request) { return } for _, node := range nodes { - fmt.Println("updating node ", node.Name, " for a key update") - if err := mq.NodeUpdate(&node); err != nil { - logger.Log(2, "failed key update ", node.Name) - } + logger.Log(3, "updating node ", node.Name, " for a key update") + runUpdates(&node, true) } - node, err := logic.GetNetworkServerLeader(netname) - if err != nil { - logger.Log(2, "failed to get server node") - return - } - runUpdates(&node, false, false) } // Update a network @@ -184,7 +174,7 @@ func updateNetwork(w http.ResponseWriter, r *http.Request) { return } for _, node := range nodes { - runUpdates(&node, true, false) + runUpdates(&node, true) } } diff --git a/controllers/node.go b/controllers/node.go index b2156a9b..dfc0442a 100644 --- a/controllers/node.go +++ b/controllers/node.go @@ -2,7 +2,6 @@ package controller import ( "encoding/json" - "errors" "fmt" "net/http" "strings" @@ -396,8 +395,8 @@ func createNode(w http.ResponseWriter, r *http.Request) { validKey := logic.IsKeyValid(networkName, node.AccessKey) if !validKey { - //Check to see if network will allow manual sign up - //may want to switch this up with the valid key check and avoid a DB call that way. + // Check to see if network will allow manual sign up + // may want to switch this up with the valid key check and avoid a DB call that way. if network.AllowManualSignUp == "yes" { node.IsPending = "yes" } else { @@ -420,8 +419,8 @@ func createNode(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(node) } -//Takes node out of pending state -//TODO: May want to use cordon/uncordon terminology instead of "ispending". +// Takes node out of pending state +// TODO: May want to use cordon/uncordon terminology instead of "ispending". func uncordonNode(w http.ResponseWriter, r *http.Request) { var params = mux.Vars(r) w.Header().Set("Content-Type", "application/json") @@ -438,6 +437,8 @@ func uncordonNode(w http.ResponseWriter, r *http.Request) { mq.NodeUpdate(&node) } +// == EGRESS == + func createEgressGateway(w http.ResponseWriter, r *http.Request) { var gateway models.EgressGatewayRequest var params = mux.Vars(r) @@ -459,7 +460,7 @@ func createEgressGateway(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) - runUpdates(&node, true, false) + runUpdates(&node, true) } func deleteEgressGateway(w http.ResponseWriter, r *http.Request) { @@ -477,7 +478,7 @@ func deleteEgressGateway(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) - runUpdates(&node, true, false) + runUpdates(&node, true) } // == INGRESS == @@ -497,7 +498,7 @@ func createIngressGateway(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) - runUpdates(&node, true, false) + runUpdates(&node, true) } func deleteIngressGateway(w http.ResponseWriter, r *http.Request) { @@ -514,7 +515,7 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) - runUpdates(&node, true, false) + runUpdates(&node, true) } func updateNode(w http.ResponseWriter, r *http.Request) { @@ -570,10 +571,7 @@ func updateNode(w http.ResponseWriter, r *http.Request) { } if len(updatenodes) > 0 { for _, relayedNode := range updatenodes { - err = mq.NodeUpdate(&relayedNode) - if err != nil { - logger.Log(1, "error sending update to relayed node ", relayedNode.Address, "on network", node.Network, ": ", err.Error()) - } + runUpdates(&relayedNode, false) } } } @@ -586,18 +584,7 @@ func updateNode(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(newNode) - if isServer(&node) { - if ifaceDelta { - if err := mq.PublishPeerUpdate(&node); err != nil { - logger.Log(1, "failed to publish peer update "+err.Error()) - } - } - if err := logic.UpdateNode(&node, &newNode); err != nil { - logger.Log(1, "error updating server node "+err.Error()) - } - } else { - runUpdates(&newNode) - } + runUpdates(&newNode, ifaceDelta) } func deleteNode(w http.ResponseWriter, r *http.Request) { @@ -632,17 +619,48 @@ func deleteNode(w http.ResponseWriter, r *http.Request) { returnSuccessResponse(w, r, nodeid+" deleted.") logger.Log(1, r.Header.Get("user"), "Deleted node", nodeid, "from network", params["network"]) - //:w runUpdates(&node, false) } -func runUpdates(node *models.Node) error { - //don't publish to server node - if isServer(node) { - return errors.New("update to server node not permited") +func runUpdates(node *models.Node, ifaceDelta bool) { + go func() { // don't block http response + if isServer(node) { // don't publish to server node + if err := runServerUpdate(node, ifaceDelta); err != nil { + logger.Log(1, "error running server update", err.Error()) + } + } else { // publish node update if not server + if err := mq.NodeUpdate(node); err != nil { + logger.Log(1, "error publishing node update to node", node.Name, node.ID, err.Error()) + } + } + }() +} + +// updates local peers for a server on a given node's network +func runServerUpdate(serverNode *models.Node, ifaceDelta bool) error { + + err := logic.TimerCheckpoint() + if err != nil { + logger.Log(3, "error occurred on timer,", err.Error()) } - if err := mq.NodeUpdate(node); err != nil { - logger.Log(1, "error publishing node update", err.Error()) + + if servercfg.IsClientMode() != "on" { + return nil + } + + if ifaceDelta { + if err := mq.PublishPeerUpdate(serverNode); err != nil { + logger.Log(1, "failed to publish peer update "+err.Error()) + } + } + + var currentServerNode, getErr = logic.GetNetworkServerLeader(serverNode.Network) + if err != nil { + return getErr + } + + if err = logic.ServerUpdate(¤tServerNode, ifaceDelta); err != nil { + logger.Log(1, "server node:", currentServerNode.ID, "failed update") return err } return nil diff --git a/controllers/node_grpc.go b/controllers/node_grpc.go index f0e10e6e..d6a074b8 100644 --- a/controllers/node_grpc.go +++ b/controllers/node_grpc.go @@ -13,6 +13,7 @@ import ( "github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/models" "github.com/gravitl/netmaker/mq" + "github.com/gravitl/netmaker/servercfg" "github.com/gravitl/netmaker/serverctl" ) @@ -106,8 +107,6 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object) Type: nodepb.NODE_TYPE, } - //runUpdates(&node, false, false) - go func(node *models.Node) { if node.UDPHolePunch == "yes" { var currentServerNode, getErr = logic.GetNetworkServerLeader(node.Network) @@ -133,47 +132,44 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object) } // NodeServiceServer.UpdateNode updates a node and responds over gRPC -// func (s *NodeServiceServer) UpdateNode(ctx context.Context, req *nodepb.Object) (*nodepb.Object, error) { +// DELETE ONE DAY - DEPRECATED +func (s *NodeServiceServer) UpdateNode(ctx context.Context, req *nodepb.Object) (*nodepb.Object, error) { -// var newnode models.Node -// if err := json.Unmarshal([]byte(req.GetData()), &newnode); err != nil { -// return nil, err -// } + var newnode models.Node + if err := json.Unmarshal([]byte(req.GetData()), &newnode); err != nil { + return nil, err + } -// node, err := logic.GetNodeByID(newnode.ID) -// if err != nil { -// return nil, err -// } + node, err := logic.GetNodeByID(newnode.ID) + if err != nil { + return nil, err + } -// ifaceDelta := logic.IfaceDelta(&node, &newnode) + if !servercfg.GetRce() { + newnode.PostDown = node.PostDown + newnode.PostUp = node.PostUp + } -// if !servercfg.GetRce() { -// newnode.PostDown = node.PostDown -// newnode.PostUp = node.PostUp -// } + err = logic.UpdateNode(&node, &newnode) + if err != nil { + return nil, err + } + newnode.NetworkSettings, err = logic.GetNetworkSettings(node.Network) + if err != nil { + return nil, err + } + getServerAddrs(&newnode) -// err = logic.UpdateNode(&node, &newnode) -// if err != nil { -// return nil, err -// } -// newnode.NetworkSettings, err = logic.GetNetworkSettings(node.Network) -// if err != nil { -// return nil, err -// } -// getServerAddrs(&newnode) + nodeData, errN := json.Marshal(&newnode) + if errN != nil { + return nil, err + } -// nodeData, errN := json.Marshal(&newnode) -// if errN != nil { -// return nil, err -// } - -// runUpdates(&newnode, false, ifaceDelta) - -// return &nodepb.Object{ -// Data: string(nodeData), -// Type: nodepb.NODE_TYPE, -// }, nil -// } + return &nodepb.Object{ + Data: string(nodeData), + Type: nodepb.NODE_TYPE, + }, nil +} func getServerAddrs(node *models.Node) { serverNodes := logic.GetServerNodes(node.Network) @@ -218,7 +214,7 @@ func (s *NodeServiceServer) DeleteNode(ctx context.Context, req *nodepb.Object) return nil, err } - runServerPeerUpdate(&node, false) + runUpdates(&node, false) return &nodepb.Object{ Data: "success", diff --git a/controllers/relay.go b/controllers/relay.go index 4c3d4fa0..49a4cfd8 100644 --- a/controllers/relay.go +++ b/controllers/relay.go @@ -36,7 +36,7 @@ func createRelay(w http.ResponseWriter, r *http.Request) { } w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) - runUpdates(&node, true, false) + runUpdates(&node, true) } func deleteRelay(w http.ResponseWriter, r *http.Request) { @@ -58,5 +58,5 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) { } w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) - runUpdates(&node, true, false) + runUpdates(&node, true) } diff --git a/controllers/server_util.go b/controllers/server_util.go deleted file mode 100644 index 345aa111..00000000 --- a/controllers/server_util.go +++ /dev/null @@ -1,31 +0,0 @@ -package controller - -import ( - "github.com/gravitl/netmaker/logger" - "github.com/gravitl/netmaker/logic" - "github.com/gravitl/netmaker/models" - "github.com/gravitl/netmaker/servercfg" -) - -// updates local peers for a server on a given node's network -func runServerUpdate(node *models.Node, ifaceDelta bool) error { - - err := logic.TimerCheckpoint() - if err != nil { - logger.Log(3, "error occurred on timer,", err.Error()) - } - - if servercfg.IsClientMode() != "on" { - return nil - } - var currentServerNode, getErr = logic.GetNetworkServerLeader(node.Network) - if err != nil { - return getErr - } - - if err = logic.ServerUpdate(¤tServerNode, ifaceDelta); err != nil { - logger.Log(1, "server node:", currentServerNode.ID, "failed update") - return err - } - return nil -} diff --git a/mq/publishers.go b/mq/publishers.go index b7b32e2d..35177934 100644 --- a/mq/publishers.go +++ b/mq/publishers.go @@ -71,7 +71,7 @@ func PublishExtPeerUpdate(node *models.Node) error { // NodeUpdate -- publishes a node update func NodeUpdate(node *models.Node) error { - if !servercfg.IsMessageQueueBackend() { + if !servercfg.IsMessageQueueBackend() || node.IsServer == "yes" { return nil } logger.Log(3, "publishing node update to "+node.Name)