diff --git a/mq/handlers.go b/mq/handlers.go new file mode 100644 index 00000000..8f624ad9 --- /dev/null +++ b/mq/handlers.go @@ -0,0 +1,105 @@ +package mq + +import ( + "encoding/json" + + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/gravitl/netmaker/database" + "github.com/gravitl/netmaker/logger" + "github.com/gravitl/netmaker/logic" + "github.com/gravitl/netmaker/models" +) + +// DefaultHandler default message queue handler - only called when GetDebug == true +func DefaultHandler(client mqtt.Client, msg mqtt.Message) { + logger.Log(0, "MQTT Message: Topic: ", string(msg.Topic()), " Message: ", string(msg.Payload())) +} + +// Ping message Handler -- handles ping topic from client nodes +func Ping(client mqtt.Client, msg mqtt.Message) { + logger.Log(0, "Ping Handler: ", msg.Topic()) + go func() { + id, err := getID(msg.Topic()) + if err != nil { + logger.Log(0, "error getting node.ID sent on ping topic ") + return + } + node, err := logic.GetNodeByID(id) + if err != nil { + logger.Log(0, "mq-ping error getting node: ", err.Error()) + record, err := database.FetchRecord(database.NODES_TABLE_NAME, id) + if err != nil { + logger.Log(0, "error reading database ", err.Error()) + return + } + logger.Log(0, "record from database") + logger.Log(0, record) + return + } + _, decryptErr := decryptMsg(&node, msg.Payload()) + if decryptErr != nil { + logger.Log(0, "error decrypting when updating node ", node.ID, decryptErr.Error()) + return + } + node.SetLastCheckIn() + if err := logic.UpdateNode(&node, &node); err != nil { + logger.Log(0, "error updating node", node.Name, node.ID, " on checkin", err.Error()) + return + } + logger.Log(3, "ping processed for node", node.ID) + // --TODO --set client version once feature is implemented. + //node.SetClientVersion(msg.Payload()) + }() +} + +// UpdateNode message Handler -- handles updates from client nodes +func UpdateNode(client mqtt.Client, msg mqtt.Message) { + go func() { + id, err := getID(msg.Topic()) + if err != nil { + logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error()) + return + } + currentNode, err := logic.GetNodeByID(id) + if err != nil { + logger.Log(1, "error getting node ", id, err.Error()) + return + } + decrypted, decryptErr := decryptMsg(¤tNode, msg.Payload()) + if decryptErr != nil { + logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error()) + return + } + var newNode models.Node + if err := json.Unmarshal(decrypted, &newNode); err != nil { + logger.Log(1, "error unmarshaling payload ", err.Error()) + return + } + if err := logic.UpdateNode(¤tNode, &newNode); err != nil { + logger.Log(1, "error saving node", err.Error()) + return + } + logger.Log(1, "updated node", id, newNode.Name) + }() +} + +// ClientPeerUpdate message handler -- handles updating peers after signal from client nodes +func ClientPeerUpdate(client mqtt.Client, msg mqtt.Message) { + go func() { + id, err := getID(msg.Topic()) + if err != nil { + logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error()) + return + } + currentNode, err := logic.GetNodeByID(id) + if err != nil { + logger.Log(1, "error getting node ", id, err.Error()) + return + } + if err := PublishPeerUpdate(¤tNode); err != nil { + logger.Log(1, "error publishing peer update ", err.Error()) + return + } + logger.Log(1, "sent peer updates after signal received from", id, currentNode.Name) + }() +} diff --git a/mq/mq.go b/mq/mq.go index 8c64ba10..6ce8ec3d 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -2,18 +2,11 @@ package mq import ( "context" - "encoding/json" - "errors" - "fmt" "log" - "strings" "time" mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/gravitl/netmaker/database" "github.com/gravitl/netmaker/logger" - "github.com/gravitl/netmaker/logic" - "github.com/gravitl/netmaker/models" "github.com/gravitl/netmaker/netclient/ncutils" "github.com/gravitl/netmaker/servercfg" ) @@ -25,171 +18,6 @@ const MQ_DISCONNECT = 250 var peer_force_send = 0 -// DefaultHandler default message queue handler - only called when GetDebug == true -func DefaultHandler(client mqtt.Client, msg mqtt.Message) { - logger.Log(0, "MQTT Message: Topic: ", string(msg.Topic()), " Message: ", string(msg.Payload())) -} - -// Ping message Handler -- handles ping topic from client nodes -func Ping(client mqtt.Client, msg mqtt.Message) { - logger.Log(0, "Ping Handler: ", msg.Topic()) - go func() { - id, err := GetID(msg.Topic()) - if err != nil { - logger.Log(0, "error getting node.ID sent on ping topic ") - return - } - node, err := logic.GetNodeByID(id) - if err != nil { - logger.Log(0, "mq-ping error getting node: ", err.Error()) - record, err := database.FetchRecord(database.NODES_TABLE_NAME, id) - if err != nil { - logger.Log(0, "error reading database ", err.Error()) - return - } - logger.Log(0, "record from database") - logger.Log(0, record) - return - } - _, decryptErr := decryptMsg(&node, msg.Payload()) - if decryptErr != nil { - logger.Log(0, "error decrypting when updating node ", node.ID, decryptErr.Error()) - return - } - node.SetLastCheckIn() - if err := logic.UpdateNode(&node, &node); err != nil { - logger.Log(0, "error updating node", node.Name, node.ID, " on checkin", err.Error()) - return - } - logger.Log(3, "ping processed for node", node.ID) - // --TODO --set client version once feature is implemented. - //node.SetClientVersion(msg.Payload()) - }() -} - -// UpdateNode message Handler -- handles updates from client nodes -func UpdateNode(client mqtt.Client, msg mqtt.Message) { - go func() { - id, err := GetID(msg.Topic()) - if err != nil { - logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error()) - return - } - currentNode, err := logic.GetNodeByID(id) - if err != nil { - logger.Log(1, "error getting node ", id, err.Error()) - return - } - decrypted, decryptErr := decryptMsg(¤tNode, msg.Payload()) - if decryptErr != nil { - logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error()) - return - } - var newNode models.Node - if err := json.Unmarshal(decrypted, &newNode); err != nil { - logger.Log(1, "error unmarshaling payload ", err.Error()) - return - } - if err := logic.UpdateNode(¤tNode, &newNode); err != nil { - logger.Log(1, "error saving node", err.Error()) - return - } - if err := PublishPeerUpdate(&newNode); err != nil { - logger.Log(1, "error publishing peer update ", err.Error()) - return - } - logger.Log(1, "Updated node", id, newNode.Name) - }() -} - -// PublishPeerUpdate --- deterines and publishes a peer update to all the peers of a node -func PublishPeerUpdate(newNode *models.Node) error { - if !servercfg.IsMessageQueueBackend() { - return nil - } - networkNodes, err := logic.GetNetworkNodes(newNode.Network) - if err != nil { - logger.Log(1, "err getting Network Nodes", err.Error()) - return err - } - for _, node := range networkNodes { - - if node.IsServer == "yes" || node.ID == newNode.ID { - continue - } - peerUpdate, err := logic.GetPeerUpdate(&node) - if err != nil { - logger.Log(1, "error getting peer update for node", node.ID, err.Error()) - continue - } - data, err := json.Marshal(&peerUpdate) - if err != nil { - logger.Log(2, "error marshaling peer update for node", node.ID, err.Error()) - continue - } - if err = publish(&node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data); err != nil { - logger.Log(1, "failed to publish peer update for node", node.ID) - } else { - logger.Log(1, "sent peer update for node", node.Name, "on network:", node.Network) - } - } - return nil -} - -// PublishPeerUpdate --- deterines and publishes a peer update to all the peers of a node -func PublishExtPeerUpdate(node *models.Node) error { - var err error - if logic.IsLocalServer(node) { - if err = logic.ServerUpdate(node, false); err != nil { - logger.Log(1, "server node:", node.ID, "failed to update peers with ext clients") - return err - } else { - return nil - } - } - if !servercfg.IsMessageQueueBackend() { - return nil - } - peerUpdate, err := logic.GetPeerUpdate(node) - if err != nil { - return err - } - data, err := json.Marshal(&peerUpdate) - if err != nil { - return err - } - return publish(node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data) -} - -// GetID -- decodes a message queue topic and returns the embedded node.ID -func GetID(topic string) (string, error) { - parts := strings.Split(topic, "/") - count := len(parts) - if count == 1 { - return "", errors.New("invalid topic") - } - //the last part of the topic will be the node.ID - return parts[count-1], nil -} - -// NodeUpdate -- publishes a node update -func NodeUpdate(node *models.Node) error { - if !servercfg.IsMessageQueueBackend() { - return nil - } - logger.Log(3, "publishing node update to "+node.Name) - data, err := json.Marshal(node) - if err != nil { - logger.Log(2, "error marshalling node update ", err.Error()) - return err - } - if err = publish(node, fmt.Sprintf("update/%s/%s", node.Network, node.ID), data); err != nil { - logger.Log(2, "error publishing node update to peer ", node.ID, err.Error()) - return err - } - return nil -} - // SetupMQTT creates a connection to broker and return client func SetupMQTT(publish bool) mqtt.Client { opts := mqtt.NewClientOptions() @@ -217,6 +45,10 @@ func SetupMQTT(publish bool) mqtt.Client { client.Disconnect(240) logger.Log(0, "node update subscription failed") } + if token := client.Subscribe("clients/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.Wait() && token.Error() != nil { + client.Disconnect(240) + logger.Log(0, "node client subscription failed") + } opts.SetOrderMatters(true) opts.SetResumeSubs(true) @@ -249,54 +81,3 @@ func Keepalive(ctx context.Context) { } } } - -// sendPeers - retrieve networks, send peer ports to all peers -func sendPeers() { - var force bool - peer_force_send++ - if peer_force_send == 5 { - force = true - peer_force_send = 0 - } - networks, err := logic.GetNetworks() - if err != nil { - logger.Log(1, "error retrieving networks for keepalive", err.Error()) - } - for _, network := range networks { - serverNode, errN := logic.GetNetworkServerLeader(network.NetID) - if errN == nil { - serverNode.SetLastCheckIn() - logic.UpdateNode(&serverNode, &serverNode) - if network.DefaultUDPHolePunch == "yes" { - if logic.ShouldPublishPeerPorts(&serverNode) || force { - if force { - logger.Log(2, "sending scheduled peer update (5 min)") - } - err = PublishPeerUpdate(&serverNode) - if err != nil { - logger.Log(1, "error publishing udp port updates for network", network.NetID) - logger.Log(1, errN.Error()) - } - } - } - } else { - logger.Log(1, "unable to retrieve leader for network ", network.NetID) - logger.Log(1, errN.Error()) - continue - } - } -} - -// func publishServerKeepalive(client mqtt.Client, network *models.Network) { -// nodes, err := logic.GetNetworkNodes(network.NetID) -// if err != nil { -// return -// } -// for _, node := range nodes { -// if token := client.Publish(fmt.Sprintf("serverkeepalive/%s/%s", network.NetID, node.ID), 0, false, servercfg.GetVersion()); token.Wait() && token.Error() != nil { -// logger.Log(1, "error publishing server keepalive for network", network.NetID, token.Error().Error()) -// } else { -// logger.Log(2, "keepalive sent for network/node", network.NetID, node.ID) -// } -// } -// } diff --git a/mq/publishers.go b/mq/publishers.go new file mode 100644 index 00000000..b7b32e2d --- /dev/null +++ b/mq/publishers.go @@ -0,0 +1,125 @@ +package mq + +import ( + "encoding/json" + "fmt" + + "github.com/gravitl/netmaker/logger" + "github.com/gravitl/netmaker/logic" + "github.com/gravitl/netmaker/models" + "github.com/gravitl/netmaker/servercfg" +) + +// PublishPeerUpdate --- deterines and publishes a peer update to all the peers of a node +func PublishPeerUpdate(newNode *models.Node) error { + if !servercfg.IsMessageQueueBackend() { + return nil + } + networkNodes, err := logic.GetNetworkNodes(newNode.Network) + if err != nil { + logger.Log(1, "err getting Network Nodes", err.Error()) + return err + } + for _, node := range networkNodes { + + if node.IsServer == "yes" || node.ID == newNode.ID { + continue + } + peerUpdate, err := logic.GetPeerUpdate(&node) + if err != nil { + logger.Log(1, "error getting peer update for node", node.ID, err.Error()) + continue + } + data, err := json.Marshal(&peerUpdate) + if err != nil { + logger.Log(2, "error marshaling peer update for node", node.ID, err.Error()) + continue + } + if err = publish(&node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data); err != nil { + logger.Log(1, "failed to publish peer update for node", node.ID) + } else { + logger.Log(1, "sent peer update for node", node.Name, "on network:", node.Network) + } + } + return nil +} + +// PublishPeerUpdate --- publishes a peer update to all the peers of a node +func PublishExtPeerUpdate(node *models.Node) error { + var err error + if logic.IsLocalServer(node) { + if err = logic.ServerUpdate(node, false); err != nil { + logger.Log(1, "server node:", node.ID, "failed to update peers with ext clients") + return err + } else { + return nil + } + } + if !servercfg.IsMessageQueueBackend() { + return nil + } + peerUpdate, err := logic.GetPeerUpdate(node) + if err != nil { + return err + } + data, err := json.Marshal(&peerUpdate) + if err != nil { + return err + } + return publish(node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data) +} + +// NodeUpdate -- publishes a node update +func NodeUpdate(node *models.Node) error { + if !servercfg.IsMessageQueueBackend() { + return nil + } + logger.Log(3, "publishing node update to "+node.Name) + data, err := json.Marshal(node) + if err != nil { + logger.Log(2, "error marshalling node update ", err.Error()) + return err + } + if err = publish(node, fmt.Sprintf("update/%s/%s", node.Network, node.ID), data); err != nil { + logger.Log(2, "error publishing node update to peer ", node.ID, err.Error()) + return err + } + return nil +} + +// sendPeers - retrieve networks, send peer ports to all peers +func sendPeers() { + var force bool + peer_force_send++ + if peer_force_send == 5 { + force = true + peer_force_send = 0 + } + networks, err := logic.GetNetworks() + if err != nil { + logger.Log(1, "error retrieving networks for keepalive", err.Error()) + } + for _, network := range networks { + serverNode, errN := logic.GetNetworkServerLeader(network.NetID) + if errN == nil { + serverNode.SetLastCheckIn() + logic.UpdateNode(&serverNode, &serverNode) + if network.DefaultUDPHolePunch == "yes" { + if logic.ShouldPublishPeerPorts(&serverNode) || force { + if force { + logger.Log(2, "sending scheduled peer update (5 min)") + } + err = PublishPeerUpdate(&serverNode) + if err != nil { + logger.Log(1, "error publishing udp port updates for network", network.NetID) + logger.Log(1, errN.Error()) + } + } + } + } else { + logger.Log(1, "unable to retrieve leader for network ", network.NetID) + logger.Log(1, errN.Error()) + continue + } + } +} diff --git a/mq/util.go b/mq/util.go index 8c7b2e01..8421021c 100644 --- a/mq/util.go +++ b/mq/util.go @@ -70,3 +70,14 @@ func publish(node *models.Node, dest string, msg []byte) error { } return nil } + +// decodes a message queue topic and returns the embedded node.ID +func getID(topic string) (string, error) { + parts := strings.Split(topic, "/") + count := len(parts) + if count == 1 { + return "", fmt.Errorf("invalid topic") + } + //the last part of the topic will be the node.ID + return parts[count-1], nil +} diff --git a/netclient/functions/daemon.go b/netclient/functions/daemon.go index f2d3b612..7552de99 100644 --- a/netclient/functions/daemon.go +++ b/netclient/functions/daemon.go @@ -88,94 +88,6 @@ func Daemon() error { } -// SetupMQTT creates a connection to broker and return client -func SetupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client { - opts := mqtt.NewClientOptions() - server := getServerAddress(cfg) - opts.AddBroker(server + ":1883") - id := ncutils.MakeRandomString(23) - opts.ClientID = id - opts.SetDefaultPublishHandler(All) - opts.SetAutoReconnect(true) - opts.SetConnectRetry(true) - opts.SetConnectRetryInterval(time.Second << 2) - opts.SetKeepAlive(time.Minute >> 1) - opts.SetWriteTimeout(time.Minute) - opts.SetOnConnectHandler(func(client mqtt.Client) { - if !publish { - if cfg.DebugOn { - if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil { - ncutils.Log(token.Error().Error()) - return - } - ncutils.Log("subscribed to all topics for debugging purposes") - } - if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil { - ncutils.Log(token.Error().Error()) - return - } - if cfg.DebugOn { - ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID)) - } - if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil { - ncutils.Log(token.Error().Error()) - return - } - if cfg.DebugOn { - ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID)) - } - opts.SetOrderMatters(true) - opts.SetResumeSubs(true) - } - }) - opts.SetConnectionLostHandler(func(c mqtt.Client, e error) { - ncutils.Log("detected broker connection lost, running pull for " + cfg.Node.Network) - _, err := Pull(cfg.Node.Network, true) - if err != nil { - ncutils.Log("could not run pull, server unreachable: " + err.Error()) - ncutils.Log("waiting to retry...") - /* - //Consider putting in logic to restart - daemon may take long time to refresh - time.Sleep(time.Minute * 5) - ncutils.Log("restarting netclient") - daemon.Restart() - */ - } - ncutils.Log("connection re-established with mqtt server") - }) - - client := mqtt.NewClient(opts) - tperiod := time.Now().Add(12 * time.Second) - for { - //if after 12 seconds, try a gRPC pull on the last try - if time.Now().After(tperiod) { - ncutils.Log("running pull for " + cfg.Node.Network) - _, err := Pull(cfg.Node.Network, true) - if err != nil { - ncutils.Log("could not run pull, exiting " + cfg.Node.Network + " setup: " + err.Error()) - return client - } - time.Sleep(time.Second) - } - if token := client.Connect(); token.Wait() && token.Error() != nil { - ncutils.Log("unable to connect to broker, retrying ...") - if time.Now().After(tperiod) { - ncutils.Log("could not connect to broker, exiting " + cfg.Node.Network + " setup: " + token.Error().Error()) - if strings.Contains(token.Error().Error(), "connectex") || strings.Contains(token.Error().Error(), "i/o timeout") { - ncutils.PrintLog("connection issue detected.. pulling and restarting daemon", 0) - Pull(cfg.Node.Network, true) - daemon.Restart() - } - return client - } - } else { - break - } - time.Sleep(2 * time.Second) - } - return client -} - // MessageQueue sets up Message Queue and subsribes/publishes updates to/from server func MessageQueue(ctx context.Context, network string) { ncutils.Log("netclient go routine started for " + network) @@ -185,7 +97,7 @@ func MessageQueue(ctx context.Context, network string) { cfg.ReadConfig() ncutils.Log("daemon started for network: " + network) - client := SetupMQTT(&cfg, false) + client := setupMQTT(&cfg, false) defer client.Disconnect(250) wg := &sync.WaitGroup{} @@ -287,15 +199,15 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) { ncutils.Log("error updating wireguard config " + err.Error()) return } - if ifaceDelta { + if ifaceDelta { // if a change caused an ifacedelta we need to notify the server to update the peers ncutils.Log("applying WG conf to " + file) err = wireguard.ApplyConf(&cfg.Node, cfg.Node.Interface, file) if err != nil { ncutils.Log("error restarting wg after node update " + err.Error()) return } - time.Sleep(time.Second >> 1) + time.Sleep(time.Second >> 1) if newNode.DNSOn == "yes" { for _, server := range newNode.NetworkSettings.DefaultServerAddrs { if server.IsLeader { @@ -304,6 +216,7 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) { } } } + publishClientPeers(&cfg) } //deal with DNS if newNode.DNSOn != "yes" && shouldDNSChange && cfg.Node.Interface != "" { @@ -361,39 +274,6 @@ func UpdatePeers(client mqtt.Client, msg mqtt.Message) { } } -// MonitorKeepalive - checks time last server keepalive received. If more than 3+ minutes, notify and resubscribe -// func MonitorKeepalive(ctx context.Context, wg *sync.WaitGroup, client mqtt.Client, cfg *config.ClientConfig) { -// defer wg.Done() -// for { -// select { -// case <-ctx.Done(): -// ncutils.Log("cancel recieved, monitor keepalive exiting") -// return -// case <-time.After(time.Second * 150): -// var keepalivetime time.Time -// keepaliveval, ok := keepalive.Load(cfg.Node.Network) -// if ok { -// keepalivetime = keepaliveval.(time.Time) -// if !keepalivetime.IsZero() && time.Since(keepalivetime) > time.Second*120 { // more than 2+ minutes -// // ncutils.Log("server keepalive not recieved recently, resubscribe to message queue") -// // err := Resubscribe(client, cfg) -// // if err != nil { -// // ncutils.Log("closing " + err.Error()) -// // } -// ncutils.Log("maybe wanna call something") -// } -// } -// } -// } -// } - -// ServerKeepAlive -- handler to react to keepalive messages published by server -// func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) { -// var currentTime = time.Now() -// keepalive.Store(parseNetworkFromTopic(msg.Topic()), currentTime) -// ncutils.PrintLog("received server keepalive at "+currentTime.String(), 2) -// } - // UpdateKeys -- updates private key and returns new publickey func UpdateKeys(cfg *config.ClientConfig, client mqtt.Client) error { ncutils.Log("received message to update keys") @@ -508,6 +388,103 @@ func Hello(cfg *config.ClientConfig, network string) { // == Private == +// setupMQTT creates a connection to broker and return client +func setupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client { + opts := mqtt.NewClientOptions() + server := getServerAddress(cfg) + opts.AddBroker(server + ":1883") + id := ncutils.MakeRandomString(23) + opts.ClientID = id + opts.SetDefaultPublishHandler(All) + opts.SetAutoReconnect(true) + opts.SetConnectRetry(true) + opts.SetConnectRetryInterval(time.Second << 2) + opts.SetKeepAlive(time.Minute >> 1) + opts.SetWriteTimeout(time.Minute) + opts.SetOnConnectHandler(func(client mqtt.Client) { + if !publish { + if cfg.DebugOn { + if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil { + ncutils.Log(token.Error().Error()) + return + } + ncutils.Log("subscribed to all topics for debugging purposes") + } + if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil { + ncutils.Log(token.Error().Error()) + return + } + if cfg.DebugOn { + ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID)) + } + if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil { + ncutils.Log(token.Error().Error()) + return + } + if cfg.DebugOn { + ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID)) + } + opts.SetOrderMatters(true) + opts.SetResumeSubs(true) + } + }) + opts.SetConnectionLostHandler(func(c mqtt.Client, e error) { + ncutils.Log("detected broker connection lost, running pull for " + cfg.Node.Network) + _, err := Pull(cfg.Node.Network, true) + if err != nil { + ncutils.Log("could not run pull, server unreachable: " + err.Error()) + ncutils.Log("waiting to retry...") + /* + //Consider putting in logic to restart - daemon may take long time to refresh + time.Sleep(time.Minute * 5) + ncutils.Log("restarting netclient") + daemon.Restart() + */ + } + ncutils.Log("connection re-established with mqtt server") + }) + + client := mqtt.NewClient(opts) + tperiod := time.Now().Add(12 * time.Second) + for { + //if after 12 seconds, try a gRPC pull on the last try + if time.Now().After(tperiod) { + ncutils.Log("running pull for " + cfg.Node.Network) + _, err := Pull(cfg.Node.Network, true) + if err != nil { + ncutils.Log("could not run pull, exiting " + cfg.Node.Network + " setup: " + err.Error()) + return client + } + time.Sleep(time.Second) + } + if token := client.Connect(); token.Wait() && token.Error() != nil { + ncutils.Log("unable to connect to broker, retrying ...") + if time.Now().After(tperiod) { + ncutils.Log("could not connect to broker, exiting " + cfg.Node.Network + " setup: " + token.Error().Error()) + if strings.Contains(token.Error().Error(), "connectex") || strings.Contains(token.Error().Error(), "i/o timeout") { + ncutils.PrintLog("connection issue detected.. pulling and restarting daemon", 0) + Pull(cfg.Node.Network, true) + daemon.Restart() + } + return client + } + } else { + break + } + time.Sleep(2 * time.Second) + } + return client +} + +// publishes a message to server to update peers on this peer's behalf +func publishClientPeers(cfg *config.ClientConfig) error { + payload := []byte(ncutils.MakeRandomString(16)) // just random string for now to keep the bytes different + if err := publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), payload); err != nil { + return err + } + return nil +} + func initialPull(network string) { ncutils.Log("pulling latest config for " + network) var configPath = fmt.Sprintf("%snetconfig-%s", ncutils.GetNetclientPathSpecific(), network) @@ -548,7 +525,7 @@ func publish(cfg *config.ClientConfig, dest string, msg []byte) error { return err } - client := SetupMQTT(cfg, true) + client := setupMQTT(cfg, true) defer client.Disconnect(250) encrypted, err := ncutils.Chunk(msg, serverPubKey, trafficPrivKey) if err != nil {