diff --git a/controllers/node.go b/controllers/node.go index 712bcb8a..ab9cc533 100644 --- a/controllers/node.go +++ b/controllers/node.go @@ -12,7 +12,6 @@ import ( "github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/models" "github.com/gravitl/netmaker/mq" - "github.com/gravitl/netmaker/netclient/ncutils" "github.com/gravitl/netmaker/servercfg" "golang.org/x/crypto/bcrypt" ) @@ -407,13 +406,11 @@ func createNode(w http.ResponseWriter, r *http.Request) { return } - if err = runServerPeerUpdate(node.Network, isServer(&node), "node create"); err != nil { - logger.Log(1, "internal error when creating node:", node.ID) - } - logger.Log(1, r.Header.Get("user"), "created new node", node.Name, "on network", node.Network) w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) + + runUpdates(&node, false) } //Takes node out of pending state @@ -427,20 +424,11 @@ func uncordonNode(w http.ResponseWriter, r *http.Request) { returnErrorResponse(w, r, formatError(err, "internal")) return } - if err = runServerPeerUpdate(node.Network, isServer(&node), "node uncordon"); err != nil { - logger.Log(1, "internal error when approving node:", nodeid) - } - go func() { - if err := mq.NodeUpdate(&node); err != nil { - logger.Log(1, "error publishing node update", err.Error()) - } - if err := mq.PublishPeerUpdate(&node); err != nil { - logger.Log(1, "error publishing peer update ", err.Error()) - } - }() logger.Log(1, r.Header.Get("user"), "uncordoned node", node.Name) w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode("SUCCESS") + + runUpdates(&node, true) } func createEgressGateway(w http.ResponseWriter, r *http.Request) { @@ -459,20 +447,12 @@ func createEgressGateway(w http.ResponseWriter, r *http.Request) { returnErrorResponse(w, r, formatError(err, "internal")) return } - if err = runServerPeerUpdate(gateway.NetID, isServer(&node), "node egress create"); err != nil { - logger.Log(1, "internal error when setting peers after creating egress on node:", gateway.NodeID) - } - go func() { - if err := mq.NodeUpdate(&node); err != nil { - logger.Log(1, "error publishing node update", err.Error()) - } - if err := mq.PublishPeerUpdate(&node); err != nil { - logger.Log(1, "error publishing peer update "+err.Error()) - } - }() + logger.Log(1, r.Header.Get("user"), "created egress gateway on node", gateway.NodeID, "on network", gateway.NetID) w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) + + runUpdates(&node, true) } func deleteEgressGateway(w http.ResponseWriter, r *http.Request) { @@ -485,20 +465,12 @@ func deleteEgressGateway(w http.ResponseWriter, r *http.Request) { returnErrorResponse(w, r, formatError(err, "internal")) return } - if err = runServerPeerUpdate(netid, isServer(&node), "egress delete"); err != nil { - logger.Log(1, "internal error when setting peers after removing egress on node:", nodeid) - } - go func() { - if err := mq.NodeUpdate(&node); err != nil { - logger.Log(1, "error publishing node update", err.Error()) - } - if err := mq.PublishPeerUpdate(&node); err != nil { - logger.Log(1, "error publishing peer update ", err.Error()) - } - }() + logger.Log(1, r.Header.Get("user"), "deleted egress gateway", nodeid, "on network", netid) w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) + + runUpdates(&node, true) } // == INGRESS == @@ -513,17 +485,12 @@ func createIngressGateway(w http.ResponseWriter, r *http.Request) { returnErrorResponse(w, r, formatError(err, "internal")) return } - go func() { - if err := mq.NodeUpdate(&node); err != nil { - logger.Log(1, "error publishing node update", err.Error()) - } - if err := mq.PublishPeerUpdate(&node); err != nil { - logger.Log(1, "error publishing peer update ", err.Error()) - } - }() + logger.Log(1, r.Header.Get("user"), "created ingress gateway on node", nodeid, "on network", netid) w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) + + runUpdates(&node, true) } func deleteIngressGateway(w http.ResponseWriter, r *http.Request) { @@ -535,17 +502,12 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) { returnErrorResponse(w, r, formatError(err, "internal")) return } - go func() { - if err := mq.NodeUpdate(&node); err != nil { - logger.Log(1, "error publishing node update", err.Error()) - } - if err := mq.PublishPeerUpdate(&node); err != nil { - logger.Log(1, "error publishing peer update ", err.Error()) - } - }() + logger.Log(1, r.Header.Get("user"), "deleted ingress gateway", nodeid) w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) + + runUpdates(&node, true) } func updateNode(w http.ResponseWriter, r *http.Request) { @@ -587,8 +549,6 @@ func updateNode(w http.ResponseWriter, r *http.Request) { newNode.PostUp = node.PostUp } - var shouldPeersUpdate = ncutils.IfaceDelta(&node, &newNode) - err = logic.UpdateNode(&node, &newNode) if err != nil { returnErrorResponse(w, r, formatError(err, "internal")) @@ -605,24 +565,11 @@ func updateNode(w http.ResponseWriter, r *http.Request) { err = logic.SetDNS() } - err = runServerPeerUpdate(node.Network, shouldPeersUpdate, "node update") - if err != nil { - returnErrorResponse(w, r, formatError(err, "internal")) - return - } logger.Log(1, r.Header.Get("user"), "updated node", node.MacAddress, "on network", node.Network) w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(newNode) - go func() { - if err := mq.NodeUpdate(&newNode); err != nil { - logger.Log(1, "error publishing node update", err.Error()) - } - if shouldPeersUpdate { - if err := mq.PublishPeerUpdate(&newNode); err != nil { - logger.Log(1, "error publishing peer update after node update", err.Error()) - } - } - }() + + runUpdates(&node, true) } func deleteNode(w http.ResponseWriter, r *http.Request) { @@ -643,17 +590,29 @@ func deleteNode(w http.ResponseWriter, r *http.Request) { return } - err = runServerPeerUpdate(node.Network, isServer(&node), "node delete") if err != nil { returnErrorResponse(w, r, formatError(err, "internal")) return } node.Action = models.NODE_DELETE - go func() { - if err := mq.NodeUpdate(&node); err != nil { - logger.Log(1, "error publishing node delete ", err.Error()) - } - }() logger.Log(1, r.Header.Get("user"), "Deleted node", nodeid, "from network", params["network"]) returnSuccessResponse(w, r, nodeid+" deleted.") + + runUpdates(&node, true) +} + +func runUpdates(node *models.Node, nodeUpdate bool) error { + if nodeUpdate { + if err := mq.NodeUpdate(node); err != nil { + logger.Log(1, "error publishing node update", err.Error()) + return err + } + } + + if err := runServerPeerUpdate(node, isServer(node)); err != nil { + logger.Log(1, "internal error when approving node:", node.ID) + return err + } + + return nil } diff --git a/controllers/node_grpc.go b/controllers/node_grpc.go index 76fe1bb7..924b2b51 100644 --- a/controllers/node_grpc.go +++ b/controllers/node_grpc.go @@ -11,8 +11,6 @@ import ( "github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/models" - "github.com/gravitl/netmaker/mq" - "github.com/gravitl/netmaker/netclient/ncutils" "github.com/gravitl/netmaker/servercfg" ) @@ -68,7 +66,9 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object) return nil, errors.New("invalid key, and network does not allow no-key signups") } } + getServerAddrs(&node) + key, keyErr := logic.RetrievePublicTrafficKey() if keyErr != nil { logger.Log(0, "error retrieving key: ", keyErr.Error()) @@ -95,26 +95,7 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object) Type: nodepb.NODE_TYPE, } - network, err := logic.GetParentNetwork(node.Network) - if err != nil { - return nil, err - } - network.NodesLastModified = time.Now().Unix() - network.DefaultServerAddrs = node.NetworkSettings.DefaultServerAddrs - if err := logic.SaveNetwork(&network); err != nil { - return nil, err - } - err = runServerPeerUpdate(node.Network, isServer(&node), "node_grpc create") - if err != nil { - logger.Log(1, "internal error when setting peers after node,", node.ID, "was created (gRPC)") - } - logger.Log(0, "new node,", node.Name, ", added on network,"+node.Network) - // notify other nodes on network of new peer - go func() { - if err := mq.PublishPeerUpdate(&node); err != nil { - logger.Log(0, "failed to inform peers of new node ", err.Error()) - } - }() + runUpdates(&node, false) return response, nil } @@ -136,8 +117,9 @@ func (s *NodeServiceServer) UpdateNode(ctx context.Context, req *nodepb.Object) newnode.PostDown = node.PostDown newnode.PostUp = node.PostUp } - var shouldPeersUpdate = ncutils.IfaceDelta(&node, &newnode) + getServerAddrs(&node) + err = logic.UpdateNode(&node, &newnode) if err != nil { return nil, err @@ -150,10 +132,9 @@ func (s *NodeServiceServer) UpdateNode(ctx context.Context, req *nodepb.Object) if errN != nil { return nil, err } - err = runServerPeerUpdate(newnode.Network, shouldPeersUpdate, "node_grpc update") - if err != nil { - logger.Log(1, "could not update peers on gRPC after node,", newnode.ID, "updated (gRPC), \nerror:", err.Error()) - } + + runUpdates(&node, false) + return &nodepb.Object{ Data: string(nodeData), Type: nodepb.NODE_TYPE, @@ -170,7 +151,11 @@ func getServerAddrs(node *models.Node) { } } // TODO consolidate functionality around files + node.NetworkSettings.NodesLastModified = time.Now().Unix() node.NetworkSettings.DefaultServerAddrs = serverAddrs + if err := logic.SaveNetwork(&node.NetworkSettings); err != nil { + logger.Log(1, "unable to save network on serverAddr update", err.Error()) + } } // NodeServiceServer.DeleteNode - deletes a node and responds over gRPC @@ -185,16 +170,7 @@ func (s *NodeServiceServer) DeleteNode(ctx context.Context, req *nodepb.Object) if err != nil { return nil, err } - err = runServerPeerUpdate(node.Network, false, "node_grpc delete") - if err != nil { - logger.Log(1, "internal error when setting peers after deleting node:", node.ID, "over gRPC") - } - // notify other nodes on network of deleted peer - go func() { - if err := mq.PublishPeerUpdate(&node); err != nil { - logger.Log(0, "failed to inform peers of deleted node ", err.Error()) - } - }() + runServerPeerUpdate(&node, false) return &nodepb.Object{ Data: "success", diff --git a/controllers/relay.go b/controllers/relay.go index 185614fd..1d91c2ca 100644 --- a/controllers/relay.go +++ b/controllers/relay.go @@ -27,17 +27,14 @@ func createRelay(w http.ResponseWriter, r *http.Request) { returnErrorResponse(w, r, formatError(err, "internal")) return } - if err = runServerPeerUpdate(relay.NetID, isServer(&node), "relay create"); err != nil { + + if err := mq.NodeUpdate(&node); err != nil { + logger.Log(1, "error publishing node update", err.Error()) + } + + if err = runServerPeerUpdate(&node, isServer(&node)); err != nil { logger.Log(1, "internal error when creating relay on node:", relay.NodeID) } - go func() { - if err := mq.NodeUpdate(&node); err != nil { - logger.Log(1, "error publishing node update", err.Error()) - } - if err := mq.PublishPeerUpdate(&node); err != nil { - logger.Log(1, "error publishing peer update ", err.Error()) - } - }() logger.Log(1, r.Header.Get("user"), "created relay on node", relay.NodeID, "on network", relay.NetID) w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) @@ -53,7 +50,7 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) { returnErrorResponse(w, r, formatError(err, "internal")) return } - if err = runServerPeerUpdate(netid, isServer(&node), "relay delete"); err != nil { + if err = runServerPeerUpdate(&node, isServer(&node)); err != nil { logger.Log(1, "internal error when deleting relay on node:", nodeid) } go func() { diff --git a/controllers/server_util.go b/controllers/server_util.go index 0a95bda7..d21795a8 100644 --- a/controllers/server_util.go +++ b/controllers/server_util.go @@ -3,19 +3,27 @@ package controller import ( "github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logic" + "github.com/gravitl/netmaker/models" + "github.com/gravitl/netmaker/mq" "github.com/gravitl/netmaker/servercfg" ) -func runServerPeerUpdate(network string, ifaceDelta bool, function string) error { - logger.Log(0, "running server update from function", function) +func runServerPeerUpdate(node *models.Node, ifaceDelta bool) error { + err := logic.TimerCheckpoint() if err != nil { logger.Log(3, "error occurred on timer,", err.Error()) } + if servercfg.IsMessageQueueBackend() { + if err := mq.PublishPeerUpdate(node); err != nil { + logger.Log(0, "failed to inform peers of new node ", err.Error()) + } + } + if servercfg.IsClientMode() != "on" { return nil } - var currentServerNodeID, getErr = logic.GetNetworkServerNodeID(network) + var currentServerNodeID, getErr = logic.GetNetworkServerNodeID(node.Network) if err != nil { return getErr } diff --git a/netclient/functions/daemon.go b/netclient/functions/daemon.go index 52e6d812..3bacd4b4 100644 --- a/netclient/functions/daemon.go +++ b/netclient/functions/daemon.go @@ -224,6 +224,10 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) { ncutils.Log("error restarting wg after node update " + err.Error()) return } + if err = Resubscribe(client, &cfg); err != nil { + ncutils.Log("error resubscribing after interface change " + err.Error()) + return + } } else { ncutils.Log("syncing conf to " + file) err = wireguard.SyncWGQuickConf(cfg.Node.Interface, file)