From 867e253c3b9c7d08cb45257e7ec024a0b41d113d Mon Sep 17 00:00:00 2001 From: 0xdcarns Date: Wed, 16 Feb 2022 15:40:51 -0500 Subject: [PATCH 1/5] refactored and cleaned up code, added peer update from clients --- mq/handlers.go | 105 ++++++++++++++++ mq/mq.go | 227 +--------------------------------- mq/publishers.go | 125 +++++++++++++++++++ mq/util.go | 11 ++ netclient/functions/daemon.go | 227 +++++++++++++++------------------- 5 files changed, 347 insertions(+), 348 deletions(-) create mode 100644 mq/handlers.go create mode 100644 mq/publishers.go diff --git a/mq/handlers.go b/mq/handlers.go new file mode 100644 index 00000000..8f624ad9 --- /dev/null +++ b/mq/handlers.go @@ -0,0 +1,105 @@ +package mq + +import ( + "encoding/json" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/gravitl/netmaker/database" + "github.com/gravitl/netmaker/logger" + "github.com/gravitl/netmaker/logic" + "github.com/gravitl/netmaker/models" +) + +// DefaultHandler default message queue handler - only called when GetDebug == true +func DefaultHandler(client mqtt.Client, msg mqtt.Message) { + logger.Log(0, "MQTT Message: Topic: ", string(msg.Topic()), " Message: ", string(msg.Payload())) +} + +// Ping message Handler -- handles ping topic from client nodes +func Ping(client mqtt.Client, msg mqtt.Message) { + logger.Log(0, "Ping Handler: ", msg.Topic()) + go func() { + id, err := getID(msg.Topic()) + if err != nil { + logger.Log(0, "error getting node.ID sent on ping topic ") + return + } + node, err := logic.GetNodeByID(id) + if err != nil { + logger.Log(0, "mq-ping error getting node: ", err.Error()) + record, err := database.FetchRecord(database.NODES_TABLE_NAME, id) + if err != nil { + logger.Log(0, "error reading database ", err.Error()) + return + } + logger.Log(0, "record from database") + logger.Log(0, record) + return + } + _, decryptErr := decryptMsg(&node, msg.Payload()) + if decryptErr != nil { + logger.Log(0, "error decrypting when updating node ", node.ID, decryptErr.Error()) + return + } + node.SetLastCheckIn() + if err := logic.UpdateNode(&node, &node); err != nil { + logger.Log(0, "error updating node", node.Name, node.ID, " on checkin", err.Error()) + return + } + logger.Log(3, "ping processed for node", node.ID) + // --TODO --set client version once feature is implemented. + //node.SetClientVersion(msg.Payload()) + }() +} + +// UpdateNode message Handler -- handles updates from client nodes +func UpdateNode(client mqtt.Client, msg mqtt.Message) { + go func() { + id, err := getID(msg.Topic()) + if err != nil { + logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error()) + return + } + currentNode, err := logic.GetNodeByID(id) + if err != nil { + logger.Log(1, "error getting node ", id, err.Error()) + return + } + decrypted, decryptErr := decryptMsg(¤tNode, msg.Payload()) + if decryptErr != nil { + logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error()) + return + } + var newNode models.Node + if err := json.Unmarshal(decrypted, &newNode); err != nil { + logger.Log(1, "error unmarshaling payload ", err.Error()) + return + } + if err := logic.UpdateNode(¤tNode, &newNode); err != nil { + logger.Log(1, "error saving node", err.Error()) + return + } + logger.Log(1, "updated node", id, newNode.Name) + }() +} + +// ClientPeerUpdate message handler -- handles updating peers after signal from client nodes +func ClientPeerUpdate(client mqtt.Client, msg mqtt.Message) { + go func() { + id, err := getID(msg.Topic()) + if err != nil { + logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error()) + return + } + currentNode, err := logic.GetNodeByID(id) + if err != nil { + logger.Log(1, "error getting node ", id, err.Error()) + return + } + if err := PublishPeerUpdate(¤tNode); err != nil { + logger.Log(1, "error publishing peer update ", err.Error()) + return + } + logger.Log(1, "sent peer updates after signal received from", id, currentNode.Name) + }() +} diff --git a/mq/mq.go b/mq/mq.go index 8c64ba10..6ce8ec3d 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -2,18 +2,11 @@ package mq import ( "context" - "encoding/json" - "errors" - "fmt" "log" - "strings" "time" mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/gravitl/netmaker/database" "github.com/gravitl/netmaker/logger" - "github.com/gravitl/netmaker/logic" - "github.com/gravitl/netmaker/models" "github.com/gravitl/netmaker/netclient/ncutils" "github.com/gravitl/netmaker/servercfg" ) @@ -25,171 +18,6 @@ const MQ_DISCONNECT = 250 var peer_force_send = 0 -// DefaultHandler default message queue handler - only called when GetDebug == true -func DefaultHandler(client mqtt.Client, msg mqtt.Message) { - logger.Log(0, "MQTT Message: Topic: ", string(msg.Topic()), " Message: ", string(msg.Payload())) -} - -// Ping message Handler -- handles ping topic from client nodes -func Ping(client mqtt.Client, msg mqtt.Message) { - logger.Log(0, "Ping Handler: ", msg.Topic()) - go func() { - id, err := GetID(msg.Topic()) - if err != nil { - logger.Log(0, "error getting node.ID sent on ping topic ") - return - } - node, err := logic.GetNodeByID(id) - if err != nil { - logger.Log(0, "mq-ping error getting node: ", err.Error()) - record, err := database.FetchRecord(database.NODES_TABLE_NAME, id) - if err != nil { - logger.Log(0, "error reading database ", err.Error()) - return - } - logger.Log(0, "record from database") - logger.Log(0, record) - return - } - _, decryptErr := decryptMsg(&node, msg.Payload()) - if decryptErr != nil { - logger.Log(0, "error decrypting when updating node ", node.ID, decryptErr.Error()) - return - } - node.SetLastCheckIn() - if err := logic.UpdateNode(&node, &node); err != nil { - logger.Log(0, "error updating node", node.Name, node.ID, " on checkin", err.Error()) - return - } - logger.Log(3, "ping processed for node", node.ID) - // --TODO --set client version once feature is implemented. - //node.SetClientVersion(msg.Payload()) - }() -} - -// UpdateNode message Handler -- handles updates from client nodes -func UpdateNode(client mqtt.Client, msg mqtt.Message) { - go func() { - id, err := GetID(msg.Topic()) - if err != nil { - logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error()) - return - } - currentNode, err := logic.GetNodeByID(id) - if err != nil { - logger.Log(1, "error getting node ", id, err.Error()) - return - } - decrypted, decryptErr := decryptMsg(¤tNode, msg.Payload()) - if decryptErr != nil { - logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error()) - return - } - var newNode models.Node - if err := json.Unmarshal(decrypted, &newNode); err != nil { - logger.Log(1, "error unmarshaling payload ", err.Error()) - return - } - if err := logic.UpdateNode(¤tNode, &newNode); err != nil { - logger.Log(1, "error saving node", err.Error()) - return - } - if err := PublishPeerUpdate(&newNode); err != nil { - logger.Log(1, "error publishing peer update ", err.Error()) - return - } - logger.Log(1, "Updated node", id, newNode.Name) - }() -} - -// PublishPeerUpdate --- deterines and publishes a peer update to all the peers of a node -func PublishPeerUpdate(newNode *models.Node) error { - if !servercfg.IsMessageQueueBackend() { - return nil - } - networkNodes, err := logic.GetNetworkNodes(newNode.Network) - if err != nil { - logger.Log(1, "err getting Network Nodes", err.Error()) - return err - } - for _, node := range networkNodes { - - if node.IsServer == "yes" || node.ID == newNode.ID { - continue - } - peerUpdate, err := logic.GetPeerUpdate(&node) - if err != nil { - logger.Log(1, "error getting peer update for node", node.ID, err.Error()) - continue - } - data, err := json.Marshal(&peerUpdate) - if err != nil { - logger.Log(2, "error marshaling peer update for node", node.ID, err.Error()) - continue - } - if err = publish(&node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data); err != nil { - logger.Log(1, "failed to publish peer update for node", node.ID) - } else { - logger.Log(1, "sent peer update for node", node.Name, "on network:", node.Network) - } - } - return nil -} - -// PublishPeerUpdate --- deterines and publishes a peer update to all the peers of a node -func PublishExtPeerUpdate(node *models.Node) error { - var err error - if logic.IsLocalServer(node) { - if err = logic.ServerUpdate(node, false); err != nil { - logger.Log(1, "server node:", node.ID, "failed to update peers with ext clients") - return err - } else { - return nil - } - } - if !servercfg.IsMessageQueueBackend() { - return nil - } - peerUpdate, err := logic.GetPeerUpdate(node) - if err != nil { - return err - } - data, err := json.Marshal(&peerUpdate) - if err != nil { - return err - } - return publish(node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data) -} - -// GetID -- decodes a message queue topic and returns the embedded node.ID -func GetID(topic string) (string, error) { - parts := strings.Split(topic, "/") - count := len(parts) - if count == 1 { - return "", errors.New("invalid topic") - } - //the last part of the topic will be the node.ID - return parts[count-1], nil -} - -// NodeUpdate -- publishes a node update -func NodeUpdate(node *models.Node) error { - if !servercfg.IsMessageQueueBackend() { - return nil - } - logger.Log(3, "publishing node update to "+node.Name) - data, err := json.Marshal(node) - if err != nil { - logger.Log(2, "error marshalling node update ", err.Error()) - return err - } - if err = publish(node, fmt.Sprintf("update/%s/%s", node.Network, node.ID), data); err != nil { - logger.Log(2, "error publishing node update to peer ", node.ID, err.Error()) - return err - } - return nil -} - // SetupMQTT creates a connection to broker and return client func SetupMQTT(publish bool) mqtt.Client { opts := mqtt.NewClientOptions() @@ -217,6 +45,10 @@ func SetupMQTT(publish bool) mqtt.Client { client.Disconnect(240) logger.Log(0, "node update subscription failed") } + if token := client.Subscribe("clients/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.Wait() && token.Error() != nil { + client.Disconnect(240) + logger.Log(0, "node client subscription failed") + } opts.SetOrderMatters(true) opts.SetResumeSubs(true) @@ -249,54 +81,3 @@ func Keepalive(ctx context.Context) { } } } - -// sendPeers - retrieve networks, send peer ports to all peers -func sendPeers() { - var force bool - peer_force_send++ - if peer_force_send == 5 { - force = true - peer_force_send = 0 - } - networks, err := logic.GetNetworks() - if err != nil { - logger.Log(1, "error retrieving networks for keepalive", err.Error()) - } - for _, network := range networks { - serverNode, errN := logic.GetNetworkServerLeader(network.NetID) - if errN == nil { - serverNode.SetLastCheckIn() - logic.UpdateNode(&serverNode, &serverNode) - if network.DefaultUDPHolePunch == "yes" { - if logic.ShouldPublishPeerPorts(&serverNode) || force { - if force { - logger.Log(2, "sending scheduled peer update (5 min)") - } - err = PublishPeerUpdate(&serverNode) - if err != nil { - logger.Log(1, "error publishing udp port updates for network", network.NetID) - logger.Log(1, errN.Error()) - } - } - } - } else { - logger.Log(1, "unable to retrieve leader for network ", network.NetID) - logger.Log(1, errN.Error()) - continue - } - } -} - -// func publishServerKeepalive(client mqtt.Client, network *models.Network) { -// nodes, err := logic.GetNetworkNodes(network.NetID) -// if err != nil { -// return -// } -// for _, node := range nodes { -// if token := client.Publish(fmt.Sprintf("serverkeepalive/%s/%s", network.NetID, node.ID), 0, false, servercfg.GetVersion()); token.Wait() && token.Error() != nil { -// logger.Log(1, "error publishing server keepalive for network", network.NetID, token.Error().Error()) -// } else { -// logger.Log(2, "keepalive sent for network/node", network.NetID, node.ID) -// } -// } -// } diff --git a/mq/publishers.go b/mq/publishers.go new file mode 100644 index 00000000..b7b32e2d --- /dev/null +++ b/mq/publishers.go @@ -0,0 +1,125 @@ +package mq + +import ( + "encoding/json" + "fmt" + + "github.com/gravitl/netmaker/logger" + "github.com/gravitl/netmaker/logic" + "github.com/gravitl/netmaker/models" + "github.com/gravitl/netmaker/servercfg" +) + +// PublishPeerUpdate --- deterines and publishes a peer update to all the peers of a node +func PublishPeerUpdate(newNode *models.Node) error { + if !servercfg.IsMessageQueueBackend() { + return nil + } + networkNodes, err := logic.GetNetworkNodes(newNode.Network) + if err != nil { + logger.Log(1, "err getting Network Nodes", err.Error()) + return err + } + for _, node := range networkNodes { + + if node.IsServer == "yes" || node.ID == newNode.ID { + continue + } + peerUpdate, err := logic.GetPeerUpdate(&node) + if err != nil { + logger.Log(1, "error getting peer update for node", node.ID, err.Error()) + continue + } + data, err := json.Marshal(&peerUpdate) + if err != nil { + logger.Log(2, "error marshaling peer update for node", node.ID, err.Error()) + continue + } + if err = publish(&node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data); err != nil { + logger.Log(1, "failed to publish peer update for node", node.ID) + } else { + logger.Log(1, "sent peer update for node", node.Name, "on network:", node.Network) + } + } + return nil +} + +// PublishPeerUpdate --- publishes a peer update to all the peers of a node +func PublishExtPeerUpdate(node *models.Node) error { + var err error + if logic.IsLocalServer(node) { + if err = logic.ServerUpdate(node, false); err != nil { + logger.Log(1, "server node:", node.ID, "failed to update peers with ext clients") + return err + } else { + return nil + } + } + if !servercfg.IsMessageQueueBackend() { + return nil + } + peerUpdate, err := logic.GetPeerUpdate(node) + if err != nil { + return err + } + data, err := json.Marshal(&peerUpdate) + if err != nil { + return err + } + return publish(node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data) +} + +// NodeUpdate -- publishes a node update +func NodeUpdate(node *models.Node) error { + if !servercfg.IsMessageQueueBackend() { + return nil + } + logger.Log(3, "publishing node update to "+node.Name) + data, err := json.Marshal(node) + if err != nil { + logger.Log(2, "error marshalling node update ", err.Error()) + return err + } + if err = publish(node, fmt.Sprintf("update/%s/%s", node.Network, node.ID), data); err != nil { + logger.Log(2, "error publishing node update to peer ", node.ID, err.Error()) + return err + } + return nil +} + +// sendPeers - retrieve networks, send peer ports to all peers +func sendPeers() { + var force bool + peer_force_send++ + if peer_force_send == 5 { + force = true + peer_force_send = 0 + } + networks, err := logic.GetNetworks() + if err != nil { + logger.Log(1, "error retrieving networks for keepalive", err.Error()) + } + for _, network := range networks { + serverNode, errN := logic.GetNetworkServerLeader(network.NetID) + if errN == nil { + serverNode.SetLastCheckIn() + logic.UpdateNode(&serverNode, &serverNode) + if network.DefaultUDPHolePunch == "yes" { + if logic.ShouldPublishPeerPorts(&serverNode) || force { + if force { + logger.Log(2, "sending scheduled peer update (5 min)") + } + err = PublishPeerUpdate(&serverNode) + if err != nil { + logger.Log(1, "error publishing udp port updates for network", network.NetID) + logger.Log(1, errN.Error()) + } + } + } + } else { + logger.Log(1, "unable to retrieve leader for network ", network.NetID) + logger.Log(1, errN.Error()) + continue + } + } +} diff --git a/mq/util.go b/mq/util.go index 8c7b2e01..8421021c 100644 --- a/mq/util.go +++ b/mq/util.go @@ -70,3 +70,14 @@ func publish(node *models.Node, dest string, msg []byte) error { } return nil } + +// decodes a message queue topic and returns the embedded node.ID +func getID(topic string) (string, error) { + parts := strings.Split(topic, "/") + count := len(parts) + if count == 1 { + return "", fmt.Errorf("invalid topic") + } + //the last part of the topic will be the node.ID + return parts[count-1], nil +} diff --git a/netclient/functions/daemon.go b/netclient/functions/daemon.go index f2d3b612..7552de99 100644 --- a/netclient/functions/daemon.go +++ b/netclient/functions/daemon.go @@ -88,94 +88,6 @@ func Daemon() error { } -// SetupMQTT creates a connection to broker and return client -func SetupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client { - opts := mqtt.NewClientOptions() - server := getServerAddress(cfg) - opts.AddBroker(server + ":1883") - id := ncutils.MakeRandomString(23) - opts.ClientID = id - opts.SetDefaultPublishHandler(All) - opts.SetAutoReconnect(true) - opts.SetConnectRetry(true) - opts.SetConnectRetryInterval(time.Second << 2) - opts.SetKeepAlive(time.Minute >> 1) - opts.SetWriteTimeout(time.Minute) - opts.SetOnConnectHandler(func(client mqtt.Client) { - if !publish { - if cfg.DebugOn { - if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil { - ncutils.Log(token.Error().Error()) - return - } - ncutils.Log("subscribed to all topics for debugging purposes") - } - if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil { - ncutils.Log(token.Error().Error()) - return - } - if cfg.DebugOn { - ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID)) - } - if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil { - ncutils.Log(token.Error().Error()) - return - } - if cfg.DebugOn { - ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID)) - } - opts.SetOrderMatters(true) - opts.SetResumeSubs(true) - } - }) - opts.SetConnectionLostHandler(func(c mqtt.Client, e error) { - ncutils.Log("detected broker connection lost, running pull for " + cfg.Node.Network) - _, err := Pull(cfg.Node.Network, true) - if err != nil { - ncutils.Log("could not run pull, server unreachable: " + err.Error()) - ncutils.Log("waiting to retry...") - /* - //Consider putting in logic to restart - daemon may take long time to refresh - time.Sleep(time.Minute * 5) - ncutils.Log("restarting netclient") - daemon.Restart() - */ - } - ncutils.Log("connection re-established with mqtt server") - }) - - client := mqtt.NewClient(opts) - tperiod := time.Now().Add(12 * time.Second) - for { - //if after 12 seconds, try a gRPC pull on the last try - if time.Now().After(tperiod) { - ncutils.Log("running pull for " + cfg.Node.Network) - _, err := Pull(cfg.Node.Network, true) - if err != nil { - ncutils.Log("could not run pull, exiting " + cfg.Node.Network + " setup: " + err.Error()) - return client - } - time.Sleep(time.Second) - } - if token := client.Connect(); token.Wait() && token.Error() != nil { - ncutils.Log("unable to connect to broker, retrying ...") - if time.Now().After(tperiod) { - ncutils.Log("could not connect to broker, exiting " + cfg.Node.Network + " setup: " + token.Error().Error()) - if strings.Contains(token.Error().Error(), "connectex") || strings.Contains(token.Error().Error(), "i/o timeout") { - ncutils.PrintLog("connection issue detected.. pulling and restarting daemon", 0) - Pull(cfg.Node.Network, true) - daemon.Restart() - } - return client - } - } else { - break - } - time.Sleep(2 * time.Second) - } - return client -} - // MessageQueue sets up Message Queue and subsribes/publishes updates to/from server func MessageQueue(ctx context.Context, network string) { ncutils.Log("netclient go routine started for " + network) @@ -185,7 +97,7 @@ func MessageQueue(ctx context.Context, network string) { cfg.ReadConfig() ncutils.Log("daemon started for network: " + network) - client := SetupMQTT(&cfg, false) + client := setupMQTT(&cfg, false) defer client.Disconnect(250) wg := &sync.WaitGroup{} @@ -287,15 +199,15 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) { ncutils.Log("error updating wireguard config " + err.Error()) return } - if ifaceDelta { + if ifaceDelta { // if a change caused an ifacedelta we need to notify the server to update the peers ncutils.Log("applying WG conf to " + file) err = wireguard.ApplyConf(&cfg.Node, cfg.Node.Interface, file) if err != nil { ncutils.Log("error restarting wg after node update " + err.Error()) return } - time.Sleep(time.Second >> 1) + time.Sleep(time.Second >> 1) if newNode.DNSOn == "yes" { for _, server := range newNode.NetworkSettings.DefaultServerAddrs { if server.IsLeader { @@ -304,6 +216,7 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) { } } } + publishClientPeers(&cfg) } //deal with DNS if newNode.DNSOn != "yes" && shouldDNSChange && cfg.Node.Interface != "" { @@ -361,39 +274,6 @@ func UpdatePeers(client mqtt.Client, msg mqtt.Message) { } } -// MonitorKeepalive - checks time last server keepalive received. If more than 3+ minutes, notify and resubscribe -// func MonitorKeepalive(ctx context.Context, wg *sync.WaitGroup, client mqtt.Client, cfg *config.ClientConfig) { -// defer wg.Done() -// for { -// select { -// case <-ctx.Done(): -// ncutils.Log("cancel recieved, monitor keepalive exiting") -// return -// case <-time.After(time.Second * 150): -// var keepalivetime time.Time -// keepaliveval, ok := keepalive.Load(cfg.Node.Network) -// if ok { -// keepalivetime = keepaliveval.(time.Time) -// if !keepalivetime.IsZero() && time.Since(keepalivetime) > time.Second*120 { // more than 2+ minutes -// // ncutils.Log("server keepalive not recieved recently, resubscribe to message queue") -// // err := Resubscribe(client, cfg) -// // if err != nil { -// // ncutils.Log("closing " + err.Error()) -// // } -// ncutils.Log("maybe wanna call something") -// } -// } -// } -// } -// } - -// ServerKeepAlive -- handler to react to keepalive messages published by server -// func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) { -// var currentTime = time.Now() -// keepalive.Store(parseNetworkFromTopic(msg.Topic()), currentTime) -// ncutils.PrintLog("received server keepalive at "+currentTime.String(), 2) -// } - // UpdateKeys -- updates private key and returns new publickey func UpdateKeys(cfg *config.ClientConfig, client mqtt.Client) error { ncutils.Log("received message to update keys") @@ -508,6 +388,103 @@ func Hello(cfg *config.ClientConfig, network string) { // == Private == +// setupMQTT creates a connection to broker and return client +func setupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client { + opts := mqtt.NewClientOptions() + server := getServerAddress(cfg) + opts.AddBroker(server + ":1883") + id := ncutils.MakeRandomString(23) + opts.ClientID = id + opts.SetDefaultPublishHandler(All) + opts.SetAutoReconnect(true) + opts.SetConnectRetry(true) + opts.SetConnectRetryInterval(time.Second << 2) + opts.SetKeepAlive(time.Minute >> 1) + opts.SetWriteTimeout(time.Minute) + opts.SetOnConnectHandler(func(client mqtt.Client) { + if !publish { + if cfg.DebugOn { + if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil { + ncutils.Log(token.Error().Error()) + return + } + ncutils.Log("subscribed to all topics for debugging purposes") + } + if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil { + ncutils.Log(token.Error().Error()) + return + } + if cfg.DebugOn { + ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID)) + } + if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil { + ncutils.Log(token.Error().Error()) + return + } + if cfg.DebugOn { + ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID)) + } + opts.SetOrderMatters(true) + opts.SetResumeSubs(true) + } + }) + opts.SetConnectionLostHandler(func(c mqtt.Client, e error) { + ncutils.Log("detected broker connection lost, running pull for " + cfg.Node.Network) + _, err := Pull(cfg.Node.Network, true) + if err != nil { + ncutils.Log("could not run pull, server unreachable: " + err.Error()) + ncutils.Log("waiting to retry...") + /* + //Consider putting in logic to restart - daemon may take long time to refresh + time.Sleep(time.Minute * 5) + ncutils.Log("restarting netclient") + daemon.Restart() + */ + } + ncutils.Log("connection re-established with mqtt server") + }) + + client := mqtt.NewClient(opts) + tperiod := time.Now().Add(12 * time.Second) + for { + //if after 12 seconds, try a gRPC pull on the last try + if time.Now().After(tperiod) { + ncutils.Log("running pull for " + cfg.Node.Network) + _, err := Pull(cfg.Node.Network, true) + if err != nil { + ncutils.Log("could not run pull, exiting " + cfg.Node.Network + " setup: " + err.Error()) + return client + } + time.Sleep(time.Second) + } + if token := client.Connect(); token.Wait() && token.Error() != nil { + ncutils.Log("unable to connect to broker, retrying ...") + if time.Now().After(tperiod) { + ncutils.Log("could not connect to broker, exiting " + cfg.Node.Network + " setup: " + token.Error().Error()) + if strings.Contains(token.Error().Error(), "connectex") || strings.Contains(token.Error().Error(), "i/o timeout") { + ncutils.PrintLog("connection issue detected.. pulling and restarting daemon", 0) + Pull(cfg.Node.Network, true) + daemon.Restart() + } + return client + } + } else { + break + } + time.Sleep(2 * time.Second) + } + return client +} + +// publishes a message to server to update peers on this peer's behalf +func publishClientPeers(cfg *config.ClientConfig) error { + payload := []byte(ncutils.MakeRandomString(16)) // just random string for now to keep the bytes different + if err := publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), payload); err != nil { + return err + } + return nil +} + func initialPull(network string) { ncutils.Log("pulling latest config for " + network) var configPath = fmt.Sprintf("%snetconfig-%s", ncutils.GetNetclientPathSpecific(), network) @@ -548,7 +525,7 @@ func publish(cfg *config.ClientConfig, dest string, msg []byte) error { return err } - client := SetupMQTT(cfg, true) + client := setupMQTT(cfg, true) defer client.Disconnect(250) encrypted, err := ncutils.Chunk(msg, serverPubKey, trafficPrivKey) if err != nil { From 407e46c1173580504b9c7c895d4c21b7b6709c0a Mon Sep 17 00:00:00 2001 From: 0xdcarns Date: Wed, 16 Feb 2022 16:23:18 -0500 Subject: [PATCH 2/5] began servside refactor --- controllers/node.go | 15 ++++----------- controllers/server_util.go | 9 +++------ mq/mq.go | 2 +- netclient/functions/daemon.go | 12 +++++------- 4 files changed, 13 insertions(+), 25 deletions(-) diff --git a/controllers/node.go b/controllers/node.go index 9ae4ea64..6aadbc7a 100644 --- a/controllers/node.go +++ b/controllers/node.go @@ -5,7 +5,6 @@ import ( "fmt" "net/http" "strings" - "time" "github.com/gorilla/mux" "github.com/gravitl/netmaker/database" @@ -418,8 +417,6 @@ func createNode(w http.ResponseWriter, r *http.Request) { logger.Log(1, r.Header.Get("user"), "created new node", node.Name, "on network", node.Network) w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) - - runUpdates(&node, false, false) } //Takes node out of pending state @@ -437,7 +434,7 @@ func uncordonNode(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode("SUCCESS") - runUpdates(&node, true, false) + mq.NodeUpdate(&node) } func createEgressGateway(w http.ResponseWriter, r *http.Request) { @@ -623,10 +620,10 @@ func deleteNode(w http.ResponseWriter, r *http.Request) { returnSuccessResponse(w, r, nodeid+" deleted.") logger.Log(1, r.Header.Get("user"), "Deleted node", nodeid, "from network", params["network"]) - runUpdates(&node, false, true) + runUpdates(&node, false) } -func runUpdates(node *models.Node, nodeUpdate bool, requiresPause bool) error { +func runUpdates(node *models.Node, nodeUpdate bool) error { //don't publish to server node if nodeUpdate && !isServer(node) { @@ -636,11 +633,7 @@ func runUpdates(node *models.Node, nodeUpdate bool, requiresPause bool) error { } } - if requiresPause { // TODO in future, detect when a node has finished iface update - time.Sleep(time.Second * 10) - } - - if err := runServerPeerUpdate(node, isServer(node)); err != nil { + if err := runServerUpdate(node, isServer(node)); err != nil { logger.Log(1, "internal error when running peer node:", err.Error()) return err } diff --git a/controllers/server_util.go b/controllers/server_util.go index fef90374..345aa111 100644 --- a/controllers/server_util.go +++ b/controllers/server_util.go @@ -4,21 +4,17 @@ import ( "github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/models" - "github.com/gravitl/netmaker/mq" "github.com/gravitl/netmaker/servercfg" ) -func runServerPeerUpdate(node *models.Node, ifaceDelta bool) error { +// updates local peers for a server on a given node's network +func runServerUpdate(node *models.Node, ifaceDelta bool) error { err := logic.TimerCheckpoint() if err != nil { logger.Log(3, "error occurred on timer,", err.Error()) } - if err := mq.PublishPeerUpdate(node); err != nil { - logger.Log(0, "failed to inform peers of new node ", err.Error()) - } - if servercfg.IsClientMode() != "on" { return nil } @@ -26,6 +22,7 @@ func runServerPeerUpdate(node *models.Node, ifaceDelta bool) error { if err != nil { return getErr } + if err = logic.ServerUpdate(¤tServerNode, ifaceDelta); err != nil { logger.Log(1, "server node:", currentServerNode.ID, "failed update") return err diff --git a/mq/mq.go b/mq/mq.go index 6ce8ec3d..9cbdb2ec 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -45,7 +45,7 @@ func SetupMQTT(publish bool) mqtt.Client { client.Disconnect(240) logger.Log(0, "node update subscription failed") } - if token := client.Subscribe("clients/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.Wait() && token.Error() != nil { + if token := client.Subscribe("signal/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.Wait() && token.Error() != nil { client.Disconnect(240) logger.Log(0, "node client subscription failed") } diff --git a/netclient/functions/daemon.go b/netclient/functions/daemon.go index 7552de99..09fa0857 100644 --- a/netclient/functions/daemon.go +++ b/netclient/functions/daemon.go @@ -26,7 +26,6 @@ import ( ) // == Message Caches == -// var keepalive = new(sync.Map) var messageCache = new(sync.Map) var networkcontext = new(sync.Map) @@ -367,7 +366,7 @@ func PublishNodeUpdate(cfg *config.ClientConfig) error { if err != nil { return err } - if err = publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), data); err != nil { + if err = publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), data, 1); err != nil { return err } return nil @@ -375,14 +374,13 @@ func PublishNodeUpdate(cfg *config.ClientConfig) error { // Hello -- ping the broker to let server know node is alive and doing fine func Hello(cfg *config.ClientConfig, network string) { - if err := publish(cfg, fmt.Sprintf("ping/%s", cfg.Node.ID), []byte(ncutils.Version)); err != nil { + if err := publish(cfg, fmt.Sprintf("ping/%s", cfg.Node.ID), []byte(ncutils.Version), 0); err != nil { ncutils.Log(fmt.Sprintf("error publishing ping, %v", err)) ncutils.Log("running pull on " + cfg.Node.Network + " to reconnect") _, err := Pull(cfg.Node.Network, true) if err != nil { ncutils.Log("could not run pull on " + cfg.Node.Network + ", error: " + err.Error()) } - } } @@ -479,7 +477,7 @@ func setupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client { // publishes a message to server to update peers on this peer's behalf func publishClientPeers(cfg *config.ClientConfig) error { payload := []byte(ncutils.MakeRandomString(16)) // just random string for now to keep the bytes different - if err := publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), payload); err != nil { + if err := publish(cfg, fmt.Sprintf("signal/%s", cfg.Node.ID), payload, 1); err != nil { return err } return nil @@ -513,7 +511,7 @@ func initialPull(network string) { } } -func publish(cfg *config.ClientConfig, dest string, msg []byte) error { +func publish(cfg *config.ClientConfig, dest string, msg []byte, qos byte) error { // setup the keys trafficPrivKey, err := auth.RetrieveTrafficKey(cfg.Node.Network) if err != nil { @@ -532,7 +530,7 @@ func publish(cfg *config.ClientConfig, dest string, msg []byte) error { return err } - if token := client.Publish(dest, 0, false, encrypted); token.Wait() && token.Error() != nil { + if token := client.Publish(dest, qos, false, encrypted); token.Wait() && token.Error() != nil { return token.Error() } return nil From f1b5518bf3bbff2676eb978988fdc72485fa43e0 Mon Sep 17 00:00:00 2001 From: "Matthew R. Kasun" Date: Wed, 16 Feb 2022 19:50:10 -0500 Subject: [PATCH 3/5] wip --- controllers/node.go | 31 +++++++++++------- controllers/node_grpc.go | 69 ++++++++++++++++++++-------------------- 2 files changed, 53 insertions(+), 47 deletions(-) diff --git a/controllers/node.go b/controllers/node.go index 6aadbc7a..b2156a9b 100644 --- a/controllers/node.go +++ b/controllers/node.go @@ -2,6 +2,7 @@ package controller import ( "encoding/json" + "errors" "fmt" "net/http" "strings" @@ -585,7 +586,18 @@ func updateNode(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(newNode) - runUpdates(&newNode, true, ifaceDelta) + if isServer(&node) { + if ifaceDelta { + if err := mq.PublishPeerUpdate(&node); err != nil { + logger.Log(1, "failed to publish peer update "+err.Error()) + } + } + if err := logic.UpdateNode(&node, &newNode); err != nil { + logger.Log(1, "error updating server node "+err.Error()) + } + } else { + runUpdates(&newNode) + } } func deleteNode(w http.ResponseWriter, r *http.Request) { @@ -620,23 +632,18 @@ func deleteNode(w http.ResponseWriter, r *http.Request) { returnSuccessResponse(w, r, nodeid+" deleted.") logger.Log(1, r.Header.Get("user"), "Deleted node", nodeid, "from network", params["network"]) + //:w runUpdates(&node, false) } -func runUpdates(node *models.Node, nodeUpdate bool) error { +func runUpdates(node *models.Node) error { //don't publish to server node - - if nodeUpdate && !isServer(node) { - if err := mq.NodeUpdate(node); err != nil { - logger.Log(1, "error publishing node update", err.Error()) - return err - } + if isServer(node) { + return errors.New("update to server node not permited") } - - if err := runServerUpdate(node, isServer(node)); err != nil { - logger.Log(1, "internal error when running peer node:", err.Error()) + if err := mq.NodeUpdate(node); err != nil { + logger.Log(1, "error publishing node update", err.Error()) return err } - return nil } diff --git a/controllers/node_grpc.go b/controllers/node_grpc.go index 3778da99..f0e10e6e 100644 --- a/controllers/node_grpc.go +++ b/controllers/node_grpc.go @@ -13,7 +13,6 @@ import ( "github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/models" "github.com/gravitl/netmaker/mq" - "github.com/gravitl/netmaker/servercfg" "github.com/gravitl/netmaker/serverctl" ) @@ -107,7 +106,7 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object) Type: nodepb.NODE_TYPE, } - runUpdates(&node, false, false) + //runUpdates(&node, false, false) go func(node *models.Node) { if node.UDPHolePunch == "yes" { @@ -134,47 +133,47 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object) } // NodeServiceServer.UpdateNode updates a node and responds over gRPC -func (s *NodeServiceServer) UpdateNode(ctx context.Context, req *nodepb.Object) (*nodepb.Object, error) { +// func (s *NodeServiceServer) UpdateNode(ctx context.Context, req *nodepb.Object) (*nodepb.Object, error) { - var newnode models.Node - if err := json.Unmarshal([]byte(req.GetData()), &newnode); err != nil { - return nil, err - } +// var newnode models.Node +// if err := json.Unmarshal([]byte(req.GetData()), &newnode); err != nil { +// return nil, err +// } - node, err := logic.GetNodeByID(newnode.ID) - if err != nil { - return nil, err - } +// node, err := logic.GetNodeByID(newnode.ID) +// if err != nil { +// return nil, err +// } - ifaceDelta := logic.IfaceDelta(&node, &newnode) +// ifaceDelta := logic.IfaceDelta(&node, &newnode) - if !servercfg.GetRce() { - newnode.PostDown = node.PostDown - newnode.PostUp = node.PostUp - } +// if !servercfg.GetRce() { +// newnode.PostDown = node.PostDown +// newnode.PostUp = node.PostUp +// } - err = logic.UpdateNode(&node, &newnode) - if err != nil { - return nil, err - } - newnode.NetworkSettings, err = logic.GetNetworkSettings(node.Network) - if err != nil { - return nil, err - } - getServerAddrs(&newnode) +// err = logic.UpdateNode(&node, &newnode) +// if err != nil { +// return nil, err +// } +// newnode.NetworkSettings, err = logic.GetNetworkSettings(node.Network) +// if err != nil { +// return nil, err +// } +// getServerAddrs(&newnode) - nodeData, errN := json.Marshal(&newnode) - if errN != nil { - return nil, err - } +// nodeData, errN := json.Marshal(&newnode) +// if errN != nil { +// return nil, err +// } - runUpdates(&newnode, false, ifaceDelta) +// runUpdates(&newnode, false, ifaceDelta) - return &nodepb.Object{ - Data: string(nodeData), - Type: nodepb.NODE_TYPE, - }, nil -} +// return &nodepb.Object{ +// Data: string(nodeData), +// Type: nodepb.NODE_TYPE, +// }, nil +// } func getServerAddrs(node *models.Node) { serverNodes := logic.GetServerNodes(node.Network) From 922ebcb4abf51403f04535c0f731a4e513e83bf4 Mon Sep 17 00:00:00 2001 From: 0xdcarns Date: Wed, 16 Feb 2022 20:42:57 -0500 Subject: [PATCH 4/5] refacted running node updates --- controllers/network.go | 16 ++------ controllers/node.go | 82 +++++++++++++++++++++++--------------- controllers/node_grpc.go | 72 ++++++++++++++++----------------- controllers/relay.go | 4 +- controllers/server_util.go | 31 -------------- mq/publishers.go | 2 +- 6 files changed, 90 insertions(+), 117 deletions(-) delete mode 100644 controllers/server_util.go diff --git a/controllers/network.go b/controllers/network.go index 95683061..088d064c 100644 --- a/controllers/network.go +++ b/controllers/network.go @@ -3,7 +3,6 @@ package controller import ( "encoding/json" "errors" - "fmt" "net/http" "strings" @@ -12,7 +11,6 @@ import ( "github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/models" - "github.com/gravitl/netmaker/mq" "github.com/gravitl/netmaker/servercfg" ) @@ -109,17 +107,9 @@ func keyUpdate(w http.ResponseWriter, r *http.Request) { return } for _, node := range nodes { - fmt.Println("updating node ", node.Name, " for a key update") - if err := mq.NodeUpdate(&node); err != nil { - logger.Log(2, "failed key update ", node.Name) - } + logger.Log(3, "updating node ", node.Name, " for a key update") + runUpdates(&node, true) } - node, err := logic.GetNetworkServerLeader(netname) - if err != nil { - logger.Log(2, "failed to get server node") - return - } - runUpdates(&node, false, false) } // Update a network @@ -184,7 +174,7 @@ func updateNetwork(w http.ResponseWriter, r *http.Request) { return } for _, node := range nodes { - runUpdates(&node, true, false) + runUpdates(&node, true) } } diff --git a/controllers/node.go b/controllers/node.go index b2156a9b..dfc0442a 100644 --- a/controllers/node.go +++ b/controllers/node.go @@ -2,7 +2,6 @@ package controller import ( "encoding/json" - "errors" "fmt" "net/http" "strings" @@ -396,8 +395,8 @@ func createNode(w http.ResponseWriter, r *http.Request) { validKey := logic.IsKeyValid(networkName, node.AccessKey) if !validKey { - //Check to see if network will allow manual sign up - //may want to switch this up with the valid key check and avoid a DB call that way. + // Check to see if network will allow manual sign up + // may want to switch this up with the valid key check and avoid a DB call that way. if network.AllowManualSignUp == "yes" { node.IsPending = "yes" } else { @@ -420,8 +419,8 @@ func createNode(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(node) } -//Takes node out of pending state -//TODO: May want to use cordon/uncordon terminology instead of "ispending". +// Takes node out of pending state +// TODO: May want to use cordon/uncordon terminology instead of "ispending". func uncordonNode(w http.ResponseWriter, r *http.Request) { var params = mux.Vars(r) w.Header().Set("Content-Type", "application/json") @@ -438,6 +437,8 @@ func uncordonNode(w http.ResponseWriter, r *http.Request) { mq.NodeUpdate(&node) } +// == EGRESS == + func createEgressGateway(w http.ResponseWriter, r *http.Request) { var gateway models.EgressGatewayRequest var params = mux.Vars(r) @@ -459,7 +460,7 @@ func createEgressGateway(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) - runUpdates(&node, true, false) + runUpdates(&node, true) } func deleteEgressGateway(w http.ResponseWriter, r *http.Request) { @@ -477,7 +478,7 @@ func deleteEgressGateway(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) - runUpdates(&node, true, false) + runUpdates(&node, true) } // == INGRESS == @@ -497,7 +498,7 @@ func createIngressGateway(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) - runUpdates(&node, true, false) + runUpdates(&node, true) } func deleteIngressGateway(w http.ResponseWriter, r *http.Request) { @@ -514,7 +515,7 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) - runUpdates(&node, true, false) + runUpdates(&node, true) } func updateNode(w http.ResponseWriter, r *http.Request) { @@ -570,10 +571,7 @@ func updateNode(w http.ResponseWriter, r *http.Request) { } if len(updatenodes) > 0 { for _, relayedNode := range updatenodes { - err = mq.NodeUpdate(&relayedNode) - if err != nil { - logger.Log(1, "error sending update to relayed node ", relayedNode.Address, "on network", node.Network, ": ", err.Error()) - } + runUpdates(&relayedNode, false) } } } @@ -586,18 +584,7 @@ func updateNode(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(newNode) - if isServer(&node) { - if ifaceDelta { - if err := mq.PublishPeerUpdate(&node); err != nil { - logger.Log(1, "failed to publish peer update "+err.Error()) - } - } - if err := logic.UpdateNode(&node, &newNode); err != nil { - logger.Log(1, "error updating server node "+err.Error()) - } - } else { - runUpdates(&newNode) - } + runUpdates(&newNode, ifaceDelta) } func deleteNode(w http.ResponseWriter, r *http.Request) { @@ -632,17 +619,48 @@ func deleteNode(w http.ResponseWriter, r *http.Request) { returnSuccessResponse(w, r, nodeid+" deleted.") logger.Log(1, r.Header.Get("user"), "Deleted node", nodeid, "from network", params["network"]) - //:w runUpdates(&node, false) } -func runUpdates(node *models.Node) error { - //don't publish to server node - if isServer(node) { - return errors.New("update to server node not permited") +func runUpdates(node *models.Node, ifaceDelta bool) { + go func() { // don't block http response + if isServer(node) { // don't publish to server node + if err := runServerUpdate(node, ifaceDelta); err != nil { + logger.Log(1, "error running server update", err.Error()) + } + } else { // publish node update if not server + if err := mq.NodeUpdate(node); err != nil { + logger.Log(1, "error publishing node update to node", node.Name, node.ID, err.Error()) + } + } + }() +} + +// updates local peers for a server on a given node's network +func runServerUpdate(serverNode *models.Node, ifaceDelta bool) error { + + err := logic.TimerCheckpoint() + if err != nil { + logger.Log(3, "error occurred on timer,", err.Error()) } - if err := mq.NodeUpdate(node); err != nil { - logger.Log(1, "error publishing node update", err.Error()) + + if servercfg.IsClientMode() != "on" { + return nil + } + + if ifaceDelta { + if err := mq.PublishPeerUpdate(serverNode); err != nil { + logger.Log(1, "failed to publish peer update "+err.Error()) + } + } + + var currentServerNode, getErr = logic.GetNetworkServerLeader(serverNode.Network) + if err != nil { + return getErr + } + + if err = logic.ServerUpdate(¤tServerNode, ifaceDelta); err != nil { + logger.Log(1, "server node:", currentServerNode.ID, "failed update") return err } return nil diff --git a/controllers/node_grpc.go b/controllers/node_grpc.go index f0e10e6e..d6a074b8 100644 --- a/controllers/node_grpc.go +++ b/controllers/node_grpc.go @@ -13,6 +13,7 @@ import ( "github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/models" "github.com/gravitl/netmaker/mq" + "github.com/gravitl/netmaker/servercfg" "github.com/gravitl/netmaker/serverctl" ) @@ -106,8 +107,6 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object) Type: nodepb.NODE_TYPE, } - //runUpdates(&node, false, false) - go func(node *models.Node) { if node.UDPHolePunch == "yes" { var currentServerNode, getErr = logic.GetNetworkServerLeader(node.Network) @@ -133,47 +132,44 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object) } // NodeServiceServer.UpdateNode updates a node and responds over gRPC -// func (s *NodeServiceServer) UpdateNode(ctx context.Context, req *nodepb.Object) (*nodepb.Object, error) { +// DELETE ONE DAY - DEPRECATED +func (s *NodeServiceServer) UpdateNode(ctx context.Context, req *nodepb.Object) (*nodepb.Object, error) { -// var newnode models.Node -// if err := json.Unmarshal([]byte(req.GetData()), &newnode); err != nil { -// return nil, err -// } + var newnode models.Node + if err := json.Unmarshal([]byte(req.GetData()), &newnode); err != nil { + return nil, err + } -// node, err := logic.GetNodeByID(newnode.ID) -// if err != nil { -// return nil, err -// } + node, err := logic.GetNodeByID(newnode.ID) + if err != nil { + return nil, err + } -// ifaceDelta := logic.IfaceDelta(&node, &newnode) + if !servercfg.GetRce() { + newnode.PostDown = node.PostDown + newnode.PostUp = node.PostUp + } -// if !servercfg.GetRce() { -// newnode.PostDown = node.PostDown -// newnode.PostUp = node.PostUp -// } + err = logic.UpdateNode(&node, &newnode) + if err != nil { + return nil, err + } + newnode.NetworkSettings, err = logic.GetNetworkSettings(node.Network) + if err != nil { + return nil, err + } + getServerAddrs(&newnode) -// err = logic.UpdateNode(&node, &newnode) -// if err != nil { -// return nil, err -// } -// newnode.NetworkSettings, err = logic.GetNetworkSettings(node.Network) -// if err != nil { -// return nil, err -// } -// getServerAddrs(&newnode) + nodeData, errN := json.Marshal(&newnode) + if errN != nil { + return nil, err + } -// nodeData, errN := json.Marshal(&newnode) -// if errN != nil { -// return nil, err -// } - -// runUpdates(&newnode, false, ifaceDelta) - -// return &nodepb.Object{ -// Data: string(nodeData), -// Type: nodepb.NODE_TYPE, -// }, nil -// } + return &nodepb.Object{ + Data: string(nodeData), + Type: nodepb.NODE_TYPE, + }, nil +} func getServerAddrs(node *models.Node) { serverNodes := logic.GetServerNodes(node.Network) @@ -218,7 +214,7 @@ func (s *NodeServiceServer) DeleteNode(ctx context.Context, req *nodepb.Object) return nil, err } - runServerPeerUpdate(&node, false) + runUpdates(&node, false) return &nodepb.Object{ Data: "success", diff --git a/controllers/relay.go b/controllers/relay.go index 4c3d4fa0..49a4cfd8 100644 --- a/controllers/relay.go +++ b/controllers/relay.go @@ -36,7 +36,7 @@ func createRelay(w http.ResponseWriter, r *http.Request) { } w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) - runUpdates(&node, true, false) + runUpdates(&node, true) } func deleteRelay(w http.ResponseWriter, r *http.Request) { @@ -58,5 +58,5 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) { } w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) - runUpdates(&node, true, false) + runUpdates(&node, true) } diff --git a/controllers/server_util.go b/controllers/server_util.go deleted file mode 100644 index 345aa111..00000000 --- a/controllers/server_util.go +++ /dev/null @@ -1,31 +0,0 @@ -package controller - -import ( - "github.com/gravitl/netmaker/logger" - "github.com/gravitl/netmaker/logic" - "github.com/gravitl/netmaker/models" - "github.com/gravitl/netmaker/servercfg" -) - -// updates local peers for a server on a given node's network -func runServerUpdate(node *models.Node, ifaceDelta bool) error { - - err := logic.TimerCheckpoint() - if err != nil { - logger.Log(3, "error occurred on timer,", err.Error()) - } - - if servercfg.IsClientMode() != "on" { - return nil - } - var currentServerNode, getErr = logic.GetNetworkServerLeader(node.Network) - if err != nil { - return getErr - } - - if err = logic.ServerUpdate(¤tServerNode, ifaceDelta); err != nil { - logger.Log(1, "server node:", currentServerNode.ID, "failed update") - return err - } - return nil -} diff --git a/mq/publishers.go b/mq/publishers.go index b7b32e2d..35177934 100644 --- a/mq/publishers.go +++ b/mq/publishers.go @@ -71,7 +71,7 @@ func PublishExtPeerUpdate(node *models.Node) error { // NodeUpdate -- publishes a node update func NodeUpdate(node *models.Node) error { - if !servercfg.IsMessageQueueBackend() { + if !servercfg.IsMessageQueueBackend() || node.IsServer == "yes" { return nil } logger.Log(3, "publishing node update to "+node.Name) From 43b6b8c6cbf6991e8b3b27edfca50aa0ce6650b2 Mon Sep 17 00:00:00 2001 From: 0xdcarns Date: Wed, 16 Feb 2022 21:10:02 -0500 Subject: [PATCH 5/5] updated serverUpdate check --- controllers/node.go | 52 ++++++++++++++++++++++++--------------------- 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/controllers/node.go b/controllers/node.go index dfc0442a..c8fa5f57 100644 --- a/controllers/node.go +++ b/controllers/node.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "strings" + "sync" "github.com/gorilla/mux" "github.com/gravitl/netmaker/database" @@ -434,7 +435,7 @@ func uncordonNode(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode("SUCCESS") - mq.NodeUpdate(&node) + runUpdates(&node, false) } // == EGRESS == @@ -624,42 +625,45 @@ func deleteNode(w http.ResponseWriter, r *http.Request) { func runUpdates(node *models.Node, ifaceDelta bool) { go func() { // don't block http response - if isServer(node) { // don't publish to server node - if err := runServerUpdate(node, ifaceDelta); err != nil { - logger.Log(1, "error running server update", err.Error()) - } - } else { // publish node update if not server - if err := mq.NodeUpdate(node); err != nil { - logger.Log(1, "error publishing node update to node", node.Name, node.ID, err.Error()) - } + err := logic.TimerCheckpoint() + if err != nil { + logger.Log(3, "error occurred on timer,", err.Error()) + } + if err := runServerUpdate(node, ifaceDelta); err != nil { + logger.Log(1, "error running server update", err.Error()) + } + // publish node update if not server + if err := mq.NodeUpdate(node); err != nil { + logger.Log(1, "error publishing node update to node", node.Name, node.ID, err.Error()) } }() } // updates local peers for a server on a given node's network -func runServerUpdate(serverNode *models.Node, ifaceDelta bool) error { - - err := logic.TimerCheckpoint() - if err != nil { - logger.Log(3, "error occurred on timer,", err.Error()) - } - +func runServerUpdate(node *models.Node, ifaceDelta bool) error { + var mutex sync.Mutex + mutex.Lock() + defer mutex.Unlock() if servercfg.IsClientMode() != "on" { return nil } - if ifaceDelta { - if err := mq.PublishPeerUpdate(serverNode); err != nil { + if !isServer(node) && ifaceDelta { + ifaceDelta = false + } + + currentServerNode, err := logic.GetNetworkServerLocal(node.Network) + if err != nil { + return err + } + + if ifaceDelta && logic.IsLeader(¤tServerNode) { + if err := mq.PublishPeerUpdate(¤tServerNode); err != nil { logger.Log(1, "failed to publish peer update "+err.Error()) } } - var currentServerNode, getErr = logic.GetNetworkServerLeader(serverNode.Network) - if err != nil { - return getErr - } - - if err = logic.ServerUpdate(¤tServerNode, ifaceDelta); err != nil { + if err := logic.ServerUpdate(¤tServerNode, ifaceDelta); err != nil { logger.Log(1, "server node:", currentServerNode.ID, "failed update") return err }