diff --git a/controllers/network.go b/controllers/network.go index 95683061..088d064c 100644 --- a/controllers/network.go +++ b/controllers/network.go @@ -3,7 +3,6 @@ package controller import ( "encoding/json" "errors" - "fmt" "net/http" "strings" @@ -12,7 +11,6 @@ import ( "github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/models" - "github.com/gravitl/netmaker/mq" "github.com/gravitl/netmaker/servercfg" ) @@ -109,17 +107,9 @@ func keyUpdate(w http.ResponseWriter, r *http.Request) { return } for _, node := range nodes { - fmt.Println("updating node ", node.Name, " for a key update") - if err := mq.NodeUpdate(&node); err != nil { - logger.Log(2, "failed key update ", node.Name) - } + logger.Log(3, "updating node ", node.Name, " for a key update") + runUpdates(&node, true) } - node, err := logic.GetNetworkServerLeader(netname) - if err != nil { - logger.Log(2, "failed to get server node") - return - } - runUpdates(&node, false, false) } // Update a network @@ -184,7 +174,7 @@ func updateNetwork(w http.ResponseWriter, r *http.Request) { return } for _, node := range nodes { - runUpdates(&node, true, false) + runUpdates(&node, true) } } diff --git a/controllers/node.go b/controllers/node.go index 9ae4ea64..c8fa5f57 100644 --- a/controllers/node.go +++ b/controllers/node.go @@ -5,7 +5,7 @@ import ( "fmt" "net/http" "strings" - "time" + "sync" "github.com/gorilla/mux" "github.com/gravitl/netmaker/database" @@ -396,8 +396,8 @@ func createNode(w http.ResponseWriter, r *http.Request) { validKey := logic.IsKeyValid(networkName, node.AccessKey) if !validKey { - //Check to see if network will allow manual sign up - //may want to switch this up with the valid key check and avoid a DB call that way. + // Check to see if network will allow manual sign up + // may want to switch this up with the valid key check and avoid a DB call that way. if network.AllowManualSignUp == "yes" { node.IsPending = "yes" } else { @@ -418,12 +418,10 @@ func createNode(w http.ResponseWriter, r *http.Request) { logger.Log(1, r.Header.Get("user"), "created new node", node.Name, "on network", node.Network) w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) - - runUpdates(&node, false, false) } -//Takes node out of pending state -//TODO: May want to use cordon/uncordon terminology instead of "ispending". +// Takes node out of pending state +// TODO: May want to use cordon/uncordon terminology instead of "ispending". func uncordonNode(w http.ResponseWriter, r *http.Request) { var params = mux.Vars(r) w.Header().Set("Content-Type", "application/json") @@ -437,9 +435,11 @@ func uncordonNode(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode("SUCCESS") - runUpdates(&node, true, false) + runUpdates(&node, false) } +// == EGRESS == + func createEgressGateway(w http.ResponseWriter, r *http.Request) { var gateway models.EgressGatewayRequest var params = mux.Vars(r) @@ -461,7 +461,7 @@ func createEgressGateway(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) - runUpdates(&node, true, false) + runUpdates(&node, true) } func deleteEgressGateway(w http.ResponseWriter, r *http.Request) { @@ -479,7 +479,7 @@ func deleteEgressGateway(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) - runUpdates(&node, true, false) + runUpdates(&node, true) } // == INGRESS == @@ -499,7 +499,7 @@ func createIngressGateway(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) - runUpdates(&node, true, false) + runUpdates(&node, true) } func deleteIngressGateway(w http.ResponseWriter, r *http.Request) { @@ -516,7 +516,7 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) - runUpdates(&node, true, false) + runUpdates(&node, true) } func updateNode(w http.ResponseWriter, r *http.Request) { @@ -572,10 +572,7 @@ func updateNode(w http.ResponseWriter, r *http.Request) { } if len(updatenodes) > 0 { for _, relayedNode := range updatenodes { - err = mq.NodeUpdate(&relayedNode) - if err != nil { - logger.Log(1, "error sending update to relayed node ", relayedNode.Address, "on network", node.Network, ": ", err.Error()) - } + runUpdates(&relayedNode, false) } } } @@ -588,7 +585,7 @@ func updateNode(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(newNode) - runUpdates(&newNode, true, ifaceDelta) + runUpdates(&newNode, ifaceDelta) } func deleteNode(w http.ResponseWriter, r *http.Request) { @@ -623,27 +620,52 @@ func deleteNode(w http.ResponseWriter, r *http.Request) { returnSuccessResponse(w, r, nodeid+" deleted.") logger.Log(1, r.Header.Get("user"), "Deleted node", nodeid, "from network", params["network"]) - runUpdates(&node, false, true) + runUpdates(&node, false) } -func runUpdates(node *models.Node, nodeUpdate bool, requiresPause bool) error { - //don't publish to server node - - if nodeUpdate && !isServer(node) { - if err := mq.NodeUpdate(node); err != nil { - logger.Log(1, "error publishing node update", err.Error()) - return err +func runUpdates(node *models.Node, ifaceDelta bool) { + go func() { // don't block http response + err := logic.TimerCheckpoint() + if err != nil { + logger.Log(3, "error occurred on timer,", err.Error()) } + if err := runServerUpdate(node, ifaceDelta); err != nil { + logger.Log(1, "error running server update", err.Error()) + } + // publish node update if not server + if err := mq.NodeUpdate(node); err != nil { + logger.Log(1, "error publishing node update to node", node.Name, node.ID, err.Error()) + } + }() +} + +// updates local peers for a server on a given node's network +func runServerUpdate(node *models.Node, ifaceDelta bool) error { + var mutex sync.Mutex + mutex.Lock() + defer mutex.Unlock() + if servercfg.IsClientMode() != "on" { + return nil } - if requiresPause { // TODO in future, detect when a node has finished iface update - time.Sleep(time.Second * 10) + if !isServer(node) && ifaceDelta { + ifaceDelta = false } - if err := runServerPeerUpdate(node, isServer(node)); err != nil { - logger.Log(1, "internal error when running peer node:", err.Error()) + currentServerNode, err := logic.GetNetworkServerLocal(node.Network) + if err != nil { return err } + if ifaceDelta && logic.IsLeader(¤tServerNode) { + if err := mq.PublishPeerUpdate(¤tServerNode); err != nil { + logger.Log(1, "failed to publish peer update "+err.Error()) + } + } + + if err := logic.ServerUpdate(¤tServerNode, ifaceDelta); err != nil { + logger.Log(1, "server node:", currentServerNode.ID, "failed update") + return err + } return nil } diff --git a/controllers/node_grpc.go b/controllers/node_grpc.go index 3778da99..d6a074b8 100644 --- a/controllers/node_grpc.go +++ b/controllers/node_grpc.go @@ -107,8 +107,6 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object) Type: nodepb.NODE_TYPE, } - runUpdates(&node, false, false) - go func(node *models.Node) { if node.UDPHolePunch == "yes" { var currentServerNode, getErr = logic.GetNetworkServerLeader(node.Network) @@ -134,6 +132,7 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object) } // NodeServiceServer.UpdateNode updates a node and responds over gRPC +// DELETE ONE DAY - DEPRECATED func (s *NodeServiceServer) UpdateNode(ctx context.Context, req *nodepb.Object) (*nodepb.Object, error) { var newnode models.Node @@ -146,8 +145,6 @@ func (s *NodeServiceServer) UpdateNode(ctx context.Context, req *nodepb.Object) return nil, err } - ifaceDelta := logic.IfaceDelta(&node, &newnode) - if !servercfg.GetRce() { newnode.PostDown = node.PostDown newnode.PostUp = node.PostUp @@ -168,8 +165,6 @@ func (s *NodeServiceServer) UpdateNode(ctx context.Context, req *nodepb.Object) return nil, err } - runUpdates(&newnode, false, ifaceDelta) - return &nodepb.Object{ Data: string(nodeData), Type: nodepb.NODE_TYPE, @@ -219,7 +214,7 @@ func (s *NodeServiceServer) DeleteNode(ctx context.Context, req *nodepb.Object) return nil, err } - runServerPeerUpdate(&node, false) + runUpdates(&node, false) return &nodepb.Object{ Data: "success", diff --git a/controllers/relay.go b/controllers/relay.go index 4c3d4fa0..49a4cfd8 100644 --- a/controllers/relay.go +++ b/controllers/relay.go @@ -36,7 +36,7 @@ func createRelay(w http.ResponseWriter, r *http.Request) { } w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) - runUpdates(&node, true, false) + runUpdates(&node, true) } func deleteRelay(w http.ResponseWriter, r *http.Request) { @@ -58,5 +58,5 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) { } w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(node) - runUpdates(&node, true, false) + runUpdates(&node, true) } diff --git a/controllers/server_util.go b/controllers/server_util.go deleted file mode 100644 index fef90374..00000000 --- a/controllers/server_util.go +++ /dev/null @@ -1,34 +0,0 @@ -package controller - -import ( - "github.com/gravitl/netmaker/logger" - "github.com/gravitl/netmaker/logic" - "github.com/gravitl/netmaker/models" - "github.com/gravitl/netmaker/mq" - "github.com/gravitl/netmaker/servercfg" -) - -func runServerPeerUpdate(node *models.Node, ifaceDelta bool) error { - - err := logic.TimerCheckpoint() - if err != nil { - logger.Log(3, "error occurred on timer,", err.Error()) - } - - if err := mq.PublishPeerUpdate(node); err != nil { - logger.Log(0, "failed to inform peers of new node ", err.Error()) - } - - if servercfg.IsClientMode() != "on" { - return nil - } - var currentServerNode, getErr = logic.GetNetworkServerLeader(node.Network) - if err != nil { - return getErr - } - if err = logic.ServerUpdate(¤tServerNode, ifaceDelta); err != nil { - logger.Log(1, "server node:", currentServerNode.ID, "failed update") - return err - } - return nil -} diff --git a/mq/handlers.go b/mq/handlers.go new file mode 100644 index 00000000..8f624ad9 --- /dev/null +++ b/mq/handlers.go @@ -0,0 +1,105 @@ +package mq + +import ( + "encoding/json" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/gravitl/netmaker/database" + "github.com/gravitl/netmaker/logger" + "github.com/gravitl/netmaker/logic" + "github.com/gravitl/netmaker/models" +) + +// DefaultHandler default message queue handler - only called when GetDebug == true +func DefaultHandler(client mqtt.Client, msg mqtt.Message) { + logger.Log(0, "MQTT Message: Topic: ", string(msg.Topic()), " Message: ", string(msg.Payload())) +} + +// Ping message Handler -- handles ping topic from client nodes +func Ping(client mqtt.Client, msg mqtt.Message) { + logger.Log(0, "Ping Handler: ", msg.Topic()) + go func() { + id, err := getID(msg.Topic()) + if err != nil { + logger.Log(0, "error getting node.ID sent on ping topic ") + return + } + node, err := logic.GetNodeByID(id) + if err != nil { + logger.Log(0, "mq-ping error getting node: ", err.Error()) + record, err := database.FetchRecord(database.NODES_TABLE_NAME, id) + if err != nil { + logger.Log(0, "error reading database ", err.Error()) + return + } + logger.Log(0, "record from database") + logger.Log(0, record) + return + } + _, decryptErr := decryptMsg(&node, msg.Payload()) + if decryptErr != nil { + logger.Log(0, "error decrypting when updating node ", node.ID, decryptErr.Error()) + return + } + node.SetLastCheckIn() + if err := logic.UpdateNode(&node, &node); err != nil { + logger.Log(0, "error updating node", node.Name, node.ID, " on checkin", err.Error()) + return + } + logger.Log(3, "ping processed for node", node.ID) + // --TODO --set client version once feature is implemented. + //node.SetClientVersion(msg.Payload()) + }() +} + +// UpdateNode message Handler -- handles updates from client nodes +func UpdateNode(client mqtt.Client, msg mqtt.Message) { + go func() { + id, err := getID(msg.Topic()) + if err != nil { + logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error()) + return + } + currentNode, err := logic.GetNodeByID(id) + if err != nil { + logger.Log(1, "error getting node ", id, err.Error()) + return + } + decrypted, decryptErr := decryptMsg(¤tNode, msg.Payload()) + if decryptErr != nil { + logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error()) + return + } + var newNode models.Node + if err := json.Unmarshal(decrypted, &newNode); err != nil { + logger.Log(1, "error unmarshaling payload ", err.Error()) + return + } + if err := logic.UpdateNode(¤tNode, &newNode); err != nil { + logger.Log(1, "error saving node", err.Error()) + return + } + logger.Log(1, "updated node", id, newNode.Name) + }() +} + +// ClientPeerUpdate message handler -- handles updating peers after signal from client nodes +func ClientPeerUpdate(client mqtt.Client, msg mqtt.Message) { + go func() { + id, err := getID(msg.Topic()) + if err != nil { + logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error()) + return + } + currentNode, err := logic.GetNodeByID(id) + if err != nil { + logger.Log(1, "error getting node ", id, err.Error()) + return + } + if err := PublishPeerUpdate(¤tNode); err != nil { + logger.Log(1, "error publishing peer update ", err.Error()) + return + } + logger.Log(1, "sent peer updates after signal received from", id, currentNode.Name) + }() +} diff --git a/mq/mq.go b/mq/mq.go index 8c64ba10..9cbdb2ec 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -2,18 +2,11 @@ package mq import ( "context" - "encoding/json" - "errors" - "fmt" "log" - "strings" "time" mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/gravitl/netmaker/database" "github.com/gravitl/netmaker/logger" - "github.com/gravitl/netmaker/logic" - "github.com/gravitl/netmaker/models" "github.com/gravitl/netmaker/netclient/ncutils" "github.com/gravitl/netmaker/servercfg" ) @@ -25,171 +18,6 @@ const MQ_DISCONNECT = 250 var peer_force_send = 0 -// DefaultHandler default message queue handler - only called when GetDebug == true -func DefaultHandler(client mqtt.Client, msg mqtt.Message) { - logger.Log(0, "MQTT Message: Topic: ", string(msg.Topic()), " Message: ", string(msg.Payload())) -} - -// Ping message Handler -- handles ping topic from client nodes -func Ping(client mqtt.Client, msg mqtt.Message) { - logger.Log(0, "Ping Handler: ", msg.Topic()) - go func() { - id, err := GetID(msg.Topic()) - if err != nil { - logger.Log(0, "error getting node.ID sent on ping topic ") - return - } - node, err := logic.GetNodeByID(id) - if err != nil { - logger.Log(0, "mq-ping error getting node: ", err.Error()) - record, err := database.FetchRecord(database.NODES_TABLE_NAME, id) - if err != nil { - logger.Log(0, "error reading database ", err.Error()) - return - } - logger.Log(0, "record from database") - logger.Log(0, record) - return - } - _, decryptErr := decryptMsg(&node, msg.Payload()) - if decryptErr != nil { - logger.Log(0, "error decrypting when updating node ", node.ID, decryptErr.Error()) - return - } - node.SetLastCheckIn() - if err := logic.UpdateNode(&node, &node); err != nil { - logger.Log(0, "error updating node", node.Name, node.ID, " on checkin", err.Error()) - return - } - logger.Log(3, "ping processed for node", node.ID) - // --TODO --set client version once feature is implemented. - //node.SetClientVersion(msg.Payload()) - }() -} - -// UpdateNode message Handler -- handles updates from client nodes -func UpdateNode(client mqtt.Client, msg mqtt.Message) { - go func() { - id, err := GetID(msg.Topic()) - if err != nil { - logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error()) - return - } - currentNode, err := logic.GetNodeByID(id) - if err != nil { - logger.Log(1, "error getting node ", id, err.Error()) - return - } - decrypted, decryptErr := decryptMsg(¤tNode, msg.Payload()) - if decryptErr != nil { - logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error()) - return - } - var newNode models.Node - if err := json.Unmarshal(decrypted, &newNode); err != nil { - logger.Log(1, "error unmarshaling payload ", err.Error()) - return - } - if err := logic.UpdateNode(¤tNode, &newNode); err != nil { - logger.Log(1, "error saving node", err.Error()) - return - } - if err := PublishPeerUpdate(&newNode); err != nil { - logger.Log(1, "error publishing peer update ", err.Error()) - return - } - logger.Log(1, "Updated node", id, newNode.Name) - }() -} - -// PublishPeerUpdate --- deterines and publishes a peer update to all the peers of a node -func PublishPeerUpdate(newNode *models.Node) error { - if !servercfg.IsMessageQueueBackend() { - return nil - } - networkNodes, err := logic.GetNetworkNodes(newNode.Network) - if err != nil { - logger.Log(1, "err getting Network Nodes", err.Error()) - return err - } - for _, node := range networkNodes { - - if node.IsServer == "yes" || node.ID == newNode.ID { - continue - } - peerUpdate, err := logic.GetPeerUpdate(&node) - if err != nil { - logger.Log(1, "error getting peer update for node", node.ID, err.Error()) - continue - } - data, err := json.Marshal(&peerUpdate) - if err != nil { - logger.Log(2, "error marshaling peer update for node", node.ID, err.Error()) - continue - } - if err = publish(&node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data); err != nil { - logger.Log(1, "failed to publish peer update for node", node.ID) - } else { - logger.Log(1, "sent peer update for node", node.Name, "on network:", node.Network) - } - } - return nil -} - -// PublishPeerUpdate --- deterines and publishes a peer update to all the peers of a node -func PublishExtPeerUpdate(node *models.Node) error { - var err error - if logic.IsLocalServer(node) { - if err = logic.ServerUpdate(node, false); err != nil { - logger.Log(1, "server node:", node.ID, "failed to update peers with ext clients") - return err - } else { - return nil - } - } - if !servercfg.IsMessageQueueBackend() { - return nil - } - peerUpdate, err := logic.GetPeerUpdate(node) - if err != nil { - return err - } - data, err := json.Marshal(&peerUpdate) - if err != nil { - return err - } - return publish(node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data) -} - -// GetID -- decodes a message queue topic and returns the embedded node.ID -func GetID(topic string) (string, error) { - parts := strings.Split(topic, "/") - count := len(parts) - if count == 1 { - return "", errors.New("invalid topic") - } - //the last part of the topic will be the node.ID - return parts[count-1], nil -} - -// NodeUpdate -- publishes a node update -func NodeUpdate(node *models.Node) error { - if !servercfg.IsMessageQueueBackend() { - return nil - } - logger.Log(3, "publishing node update to "+node.Name) - data, err := json.Marshal(node) - if err != nil { - logger.Log(2, "error marshalling node update ", err.Error()) - return err - } - if err = publish(node, fmt.Sprintf("update/%s/%s", node.Network, node.ID), data); err != nil { - logger.Log(2, "error publishing node update to peer ", node.ID, err.Error()) - return err - } - return nil -} - // SetupMQTT creates a connection to broker and return client func SetupMQTT(publish bool) mqtt.Client { opts := mqtt.NewClientOptions() @@ -217,6 +45,10 @@ func SetupMQTT(publish bool) mqtt.Client { client.Disconnect(240) logger.Log(0, "node update subscription failed") } + if token := client.Subscribe("signal/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.Wait() && token.Error() != nil { + client.Disconnect(240) + logger.Log(0, "node client subscription failed") + } opts.SetOrderMatters(true) opts.SetResumeSubs(true) @@ -249,54 +81,3 @@ func Keepalive(ctx context.Context) { } } } - -// sendPeers - retrieve networks, send peer ports to all peers -func sendPeers() { - var force bool - peer_force_send++ - if peer_force_send == 5 { - force = true - peer_force_send = 0 - } - networks, err := logic.GetNetworks() - if err != nil { - logger.Log(1, "error retrieving networks for keepalive", err.Error()) - } - for _, network := range networks { - serverNode, errN := logic.GetNetworkServerLeader(network.NetID) - if errN == nil { - serverNode.SetLastCheckIn() - logic.UpdateNode(&serverNode, &serverNode) - if network.DefaultUDPHolePunch == "yes" { - if logic.ShouldPublishPeerPorts(&serverNode) || force { - if force { - logger.Log(2, "sending scheduled peer update (5 min)") - } - err = PublishPeerUpdate(&serverNode) - if err != nil { - logger.Log(1, "error publishing udp port updates for network", network.NetID) - logger.Log(1, errN.Error()) - } - } - } - } else { - logger.Log(1, "unable to retrieve leader for network ", network.NetID) - logger.Log(1, errN.Error()) - continue - } - } -} - -// func publishServerKeepalive(client mqtt.Client, network *models.Network) { -// nodes, err := logic.GetNetworkNodes(network.NetID) -// if err != nil { -// return -// } -// for _, node := range nodes { -// if token := client.Publish(fmt.Sprintf("serverkeepalive/%s/%s", network.NetID, node.ID), 0, false, servercfg.GetVersion()); token.Wait() && token.Error() != nil { -// logger.Log(1, "error publishing server keepalive for network", network.NetID, token.Error().Error()) -// } else { -// logger.Log(2, "keepalive sent for network/node", network.NetID, node.ID) -// } -// } -// } diff --git a/mq/publishers.go b/mq/publishers.go new file mode 100644 index 00000000..35177934 --- /dev/null +++ b/mq/publishers.go @@ -0,0 +1,125 @@ +package mq + +import ( + "encoding/json" + "fmt" + + "github.com/gravitl/netmaker/logger" + "github.com/gravitl/netmaker/logic" + "github.com/gravitl/netmaker/models" + "github.com/gravitl/netmaker/servercfg" +) + +// PublishPeerUpdate --- deterines and publishes a peer update to all the peers of a node +func PublishPeerUpdate(newNode *models.Node) error { + if !servercfg.IsMessageQueueBackend() { + return nil + } + networkNodes, err := logic.GetNetworkNodes(newNode.Network) + if err != nil { + logger.Log(1, "err getting Network Nodes", err.Error()) + return err + } + for _, node := range networkNodes { + + if node.IsServer == "yes" || node.ID == newNode.ID { + continue + } + peerUpdate, err := logic.GetPeerUpdate(&node) + if err != nil { + logger.Log(1, "error getting peer update for node", node.ID, err.Error()) + continue + } + data, err := json.Marshal(&peerUpdate) + if err != nil { + logger.Log(2, "error marshaling peer update for node", node.ID, err.Error()) + continue + } + if err = publish(&node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data); err != nil { + logger.Log(1, "failed to publish peer update for node", node.ID) + } else { + logger.Log(1, "sent peer update for node", node.Name, "on network:", node.Network) + } + } + return nil +} + +// PublishPeerUpdate --- publishes a peer update to all the peers of a node +func PublishExtPeerUpdate(node *models.Node) error { + var err error + if logic.IsLocalServer(node) { + if err = logic.ServerUpdate(node, false); err != nil { + logger.Log(1, "server node:", node.ID, "failed to update peers with ext clients") + return err + } else { + return nil + } + } + if !servercfg.IsMessageQueueBackend() { + return nil + } + peerUpdate, err := logic.GetPeerUpdate(node) + if err != nil { + return err + } + data, err := json.Marshal(&peerUpdate) + if err != nil { + return err + } + return publish(node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data) +} + +// NodeUpdate -- publishes a node update +func NodeUpdate(node *models.Node) error { + if !servercfg.IsMessageQueueBackend() || node.IsServer == "yes" { + return nil + } + logger.Log(3, "publishing node update to "+node.Name) + data, err := json.Marshal(node) + if err != nil { + logger.Log(2, "error marshalling node update ", err.Error()) + return err + } + if err = publish(node, fmt.Sprintf("update/%s/%s", node.Network, node.ID), data); err != nil { + logger.Log(2, "error publishing node update to peer ", node.ID, err.Error()) + return err + } + return nil +} + +// sendPeers - retrieve networks, send peer ports to all peers +func sendPeers() { + var force bool + peer_force_send++ + if peer_force_send == 5 { + force = true + peer_force_send = 0 + } + networks, err := logic.GetNetworks() + if err != nil { + logger.Log(1, "error retrieving networks for keepalive", err.Error()) + } + for _, network := range networks { + serverNode, errN := logic.GetNetworkServerLeader(network.NetID) + if errN == nil { + serverNode.SetLastCheckIn() + logic.UpdateNode(&serverNode, &serverNode) + if network.DefaultUDPHolePunch == "yes" { + if logic.ShouldPublishPeerPorts(&serverNode) || force { + if force { + logger.Log(2, "sending scheduled peer update (5 min)") + } + err = PublishPeerUpdate(&serverNode) + if err != nil { + logger.Log(1, "error publishing udp port updates for network", network.NetID) + logger.Log(1, errN.Error()) + } + } + } + } else { + logger.Log(1, "unable to retrieve leader for network ", network.NetID) + logger.Log(1, errN.Error()) + continue + } + } +} diff --git a/mq/util.go b/mq/util.go index 8c7b2e01..8421021c 100644 --- a/mq/util.go +++ b/mq/util.go @@ -70,3 +70,14 @@ func publish(node *models.Node, dest string, msg []byte) error { } return nil } + +// decodes a message queue topic and returns the embedded node.ID +func getID(topic string) (string, error) { + parts := strings.Split(topic, "/") + count := len(parts) + if count == 1 { + return "", fmt.Errorf("invalid topic") + } + //the last part of the topic will be the node.ID + return parts[count-1], nil +} diff --git a/netclient/functions/daemon.go b/netclient/functions/daemon.go index f2d3b612..09fa0857 100644 --- a/netclient/functions/daemon.go +++ b/netclient/functions/daemon.go @@ -26,7 +26,6 @@ import ( ) // == Message Caches == -// var keepalive = new(sync.Map) var messageCache = new(sync.Map) var networkcontext = new(sync.Map) @@ -88,94 +87,6 @@ func Daemon() error { } -// SetupMQTT creates a connection to broker and return client -func SetupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client { - opts := mqtt.NewClientOptions() - server := getServerAddress(cfg) - opts.AddBroker(server + ":1883") - id := ncutils.MakeRandomString(23) - opts.ClientID = id - opts.SetDefaultPublishHandler(All) - opts.SetAutoReconnect(true) - opts.SetConnectRetry(true) - opts.SetConnectRetryInterval(time.Second << 2) - opts.SetKeepAlive(time.Minute >> 1) - opts.SetWriteTimeout(time.Minute) - opts.SetOnConnectHandler(func(client mqtt.Client) { - if !publish { - if cfg.DebugOn { - if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil { - ncutils.Log(token.Error().Error()) - return - } - ncutils.Log("subscribed to all topics for debugging purposes") - } - if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil { - ncutils.Log(token.Error().Error()) - return - } - if cfg.DebugOn { - ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID)) - } - if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil { - ncutils.Log(token.Error().Error()) - return - } - if cfg.DebugOn { - ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID)) - } - opts.SetOrderMatters(true) - opts.SetResumeSubs(true) - } - }) - opts.SetConnectionLostHandler(func(c mqtt.Client, e error) { - ncutils.Log("detected broker connection lost, running pull for " + cfg.Node.Network) - _, err := Pull(cfg.Node.Network, true) - if err != nil { - ncutils.Log("could not run pull, server unreachable: " + err.Error()) - ncutils.Log("waiting to retry...") - /* - //Consider putting in logic to restart - daemon may take long time to refresh - time.Sleep(time.Minute * 5) - ncutils.Log("restarting netclient") - daemon.Restart() - */ - } - ncutils.Log("connection re-established with mqtt server") - }) - - client := mqtt.NewClient(opts) - tperiod := time.Now().Add(12 * time.Second) - for { - //if after 12 seconds, try a gRPC pull on the last try - if time.Now().After(tperiod) { - ncutils.Log("running pull for " + cfg.Node.Network) - _, err := Pull(cfg.Node.Network, true) - if err != nil { - ncutils.Log("could not run pull, exiting " + cfg.Node.Network + " setup: " + err.Error()) - return client - } - time.Sleep(time.Second) - } - if token := client.Connect(); token.Wait() && token.Error() != nil { - ncutils.Log("unable to connect to broker, retrying ...") - if time.Now().After(tperiod) { - ncutils.Log("could not connect to broker, exiting " + cfg.Node.Network + " setup: " + token.Error().Error()) - if strings.Contains(token.Error().Error(), "connectex") || strings.Contains(token.Error().Error(), "i/o timeout") { - ncutils.PrintLog("connection issue detected.. pulling and restarting daemon", 0) - Pull(cfg.Node.Network, true) - daemon.Restart() - } - return client - } - } else { - break - } - time.Sleep(2 * time.Second) - } - return client -} - // MessageQueue sets up Message Queue and subsribes/publishes updates to/from server func MessageQueue(ctx context.Context, network string) { ncutils.Log("netclient go routine started for " + network) @@ -185,7 +96,7 @@ func MessageQueue(ctx context.Context, network string) { cfg.ReadConfig() ncutils.Log("daemon started for network: " + network) - client := SetupMQTT(&cfg, false) + client := setupMQTT(&cfg, false) defer client.Disconnect(250) wg := &sync.WaitGroup{} @@ -287,15 +198,15 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) { ncutils.Log("error updating wireguard config " + err.Error()) return } - if ifaceDelta { + if ifaceDelta { // if a change caused an ifacedelta we need to notify the server to update the peers ncutils.Log("applying WG conf to " + file) err = wireguard.ApplyConf(&cfg.Node, cfg.Node.Interface, file) if err != nil { ncutils.Log("error restarting wg after node update " + err.Error()) return } - time.Sleep(time.Second >> 1) + time.Sleep(time.Second >> 1) if newNode.DNSOn == "yes" { for _, server := range newNode.NetworkSettings.DefaultServerAddrs { if server.IsLeader { @@ -304,6 +215,7 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) { } } } + publishClientPeers(&cfg) } //deal with DNS if newNode.DNSOn != "yes" && shouldDNSChange && cfg.Node.Interface != "" { @@ -361,39 +273,6 @@ func UpdatePeers(client mqtt.Client, msg mqtt.Message) { } } -// MonitorKeepalive - checks time last server keepalive received. If more than 3+ minutes, notify and resubscribe -// func MonitorKeepalive(ctx context.Context, wg *sync.WaitGroup, client mqtt.Client, cfg *config.ClientConfig) { -// defer wg.Done() -// for { -// select { -// case <-ctx.Done(): -// ncutils.Log("cancel recieved, monitor keepalive exiting") -// return -// case <-time.After(time.Second * 150): -// var keepalivetime time.Time -// keepaliveval, ok := keepalive.Load(cfg.Node.Network) -// if ok { -// keepalivetime = keepaliveval.(time.Time) -// if !keepalivetime.IsZero() && time.Since(keepalivetime) > time.Second*120 { // more than 2+ minutes -// // ncutils.Log("server keepalive not recieved recently, resubscribe to message queue") -// // err := Resubscribe(client, cfg) -// // if err != nil { -// // ncutils.Log("closing " + err.Error()) -// // } -// ncutils.Log("maybe wanna call something") -// } -// } -// } -// } -// } - -// ServerKeepAlive -- handler to react to keepalive messages published by server -// func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) { -// var currentTime = time.Now() -// keepalive.Store(parseNetworkFromTopic(msg.Topic()), currentTime) -// ncutils.PrintLog("received server keepalive at "+currentTime.String(), 2) -// } - // UpdateKeys -- updates private key and returns new publickey func UpdateKeys(cfg *config.ClientConfig, client mqtt.Client) error { ncutils.Log("received message to update keys") @@ -487,7 +366,7 @@ func PublishNodeUpdate(cfg *config.ClientConfig) error { if err != nil { return err } - if err = publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), data); err != nil { + if err = publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), data, 1); err != nil { return err } return nil @@ -495,19 +374,115 @@ func PublishNodeUpdate(cfg *config.ClientConfig) error { // Hello -- ping the broker to let server know node is alive and doing fine func Hello(cfg *config.ClientConfig, network string) { - if err := publish(cfg, fmt.Sprintf("ping/%s", cfg.Node.ID), []byte(ncutils.Version)); err != nil { + if err := publish(cfg, fmt.Sprintf("ping/%s", cfg.Node.ID), []byte(ncutils.Version), 0); err != nil { ncutils.Log(fmt.Sprintf("error publishing ping, %v", err)) ncutils.Log("running pull on " + cfg.Node.Network + " to reconnect") _, err := Pull(cfg.Node.Network, true) if err != nil { ncutils.Log("could not run pull on " + cfg.Node.Network + ", error: " + err.Error()) } - } } // == Private == +// setupMQTT creates a connection to broker and return client +func setupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client { + opts := mqtt.NewClientOptions() + server := getServerAddress(cfg) + opts.AddBroker(server + ":1883") + id := ncutils.MakeRandomString(23) + opts.ClientID = id + opts.SetDefaultPublishHandler(All) + opts.SetAutoReconnect(true) + opts.SetConnectRetry(true) + opts.SetConnectRetryInterval(time.Second << 2) + opts.SetKeepAlive(time.Minute >> 1) + opts.SetWriteTimeout(time.Minute) + opts.SetOnConnectHandler(func(client mqtt.Client) { + if !publish { + if cfg.DebugOn { + if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil { + ncutils.Log(token.Error().Error()) + return + } + ncutils.Log("subscribed to all topics for debugging purposes") + } + if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil { + ncutils.Log(token.Error().Error()) + return + } + if cfg.DebugOn { + ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID)) + } + if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil { + ncutils.Log(token.Error().Error()) + return + } + if cfg.DebugOn { + ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID)) + } + opts.SetOrderMatters(true) + opts.SetResumeSubs(true) + } + }) + opts.SetConnectionLostHandler(func(c mqtt.Client, e error) { + ncutils.Log("detected broker connection lost, running pull for " + cfg.Node.Network) + _, err := Pull(cfg.Node.Network, true) + if err != nil { + ncutils.Log("could not run pull, server unreachable: " + err.Error()) + ncutils.Log("waiting to retry...") + /* + //Consider putting in logic to restart - daemon may take long time to refresh + time.Sleep(time.Minute * 5) + ncutils.Log("restarting netclient") + daemon.Restart() + */ + } + ncutils.Log("connection re-established with mqtt server") + }) + + client := mqtt.NewClient(opts) + tperiod := time.Now().Add(12 * time.Second) + for { + //if after 12 seconds, try a gRPC pull on the last try + if time.Now().After(tperiod) { + ncutils.Log("running pull for " + cfg.Node.Network) + _, err := Pull(cfg.Node.Network, true) + if err != nil { + ncutils.Log("could not run pull, exiting " + cfg.Node.Network + " setup: " + err.Error()) + return client + } + time.Sleep(time.Second) + } + if token := client.Connect(); token.Wait() && token.Error() != nil { + ncutils.Log("unable to connect to broker, retrying ...") + if time.Now().After(tperiod) { + ncutils.Log("could not connect to broker, exiting " + cfg.Node.Network + " setup: " + token.Error().Error()) + if strings.Contains(token.Error().Error(), "connectex") || strings.Contains(token.Error().Error(), "i/o timeout") { + ncutils.PrintLog("connection issue detected.. pulling and restarting daemon", 0) + Pull(cfg.Node.Network, true) + daemon.Restart() + } + return client + } + } else { + break + } + time.Sleep(2 * time.Second) + } + return client +} + +// publishes a message to server to update peers on this peer's behalf +func publishClientPeers(cfg *config.ClientConfig) error { + payload := []byte(ncutils.MakeRandomString(16)) // just random string for now to keep the bytes different + if err := publish(cfg, fmt.Sprintf("signal/%s", cfg.Node.ID), payload, 1); err != nil { + return err + } + return nil +} + func initialPull(network string) { ncutils.Log("pulling latest config for " + network) var configPath = fmt.Sprintf("%snetconfig-%s", ncutils.GetNetclientPathSpecific(), network) @@ -536,7 +511,7 @@ func initialPull(network string) { } } -func publish(cfg *config.ClientConfig, dest string, msg []byte) error { +func publish(cfg *config.ClientConfig, dest string, msg []byte, qos byte) error { // setup the keys trafficPrivKey, err := auth.RetrieveTrafficKey(cfg.Node.Network) if err != nil { @@ -548,14 +523,14 @@ func publish(cfg *config.ClientConfig, dest string, msg []byte) error { return err } - client := SetupMQTT(cfg, true) + client := setupMQTT(cfg, true) defer client.Disconnect(250) encrypted, err := ncutils.Chunk(msg, serverPubKey, trafficPrivKey) if err != nil { return err } - if token := client.Publish(dest, 0, false, encrypted); token.Wait() && token.Error() != nil { + if token := client.Publish(dest, qos, false, encrypted); token.Wait() && token.Error() != nil { return token.Error() } return nil