edited so that if certain error detected, restart daemon, log changes

This commit is contained in:
0xdcarns 2022-02-06 15:02:05 -05:00
parent 6160ec2725
commit 8533f456bd
2 changed files with 23 additions and 7 deletions

View file

@ -1,12 +1,18 @@
package mq
import (
"fmt"
"github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/netclient/ncutils"
)
func decryptMsg(node *models.Node, msg []byte) ([]byte, error) {
if len(msg) <= 24 { // make sure message is of appropriate length
return nil, fmt.Errorf("recieved invalid message from broker %s", string(msg))
}
trafficKey, trafficErr := logic.RetrievePrivateTrafficKey() // get server private key
if trafficErr != nil {
return nil, trafficErr

View file

@ -18,13 +18,14 @@ import (
"github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/netclient/auth"
"github.com/gravitl/netmaker/netclient/config"
"github.com/gravitl/netmaker/netclient/daemon"
"github.com/gravitl/netmaker/netclient/local"
"github.com/gravitl/netmaker/netclient/ncutils"
"github.com/gravitl/netmaker/netclient/wireguard"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
)
// ServerKeepalive - stores time of last server keepalive message
// == Message Caches ==
var keepalive = new(sync.Map)
var messageCache = new(sync.Map)
@ -41,7 +42,6 @@ func insert(network, which, cache string) {
Message: cache,
LastSeen: time.Now(),
}
ncutils.Log("storing new message: " + cache)
messageCache.Store(fmt.Sprintf("%s%s", network, which), newMessage)
}
@ -54,15 +54,15 @@ func read(network, which string) string {
}
if time.Now().After(readMessage.LastSeen.Add(time.Minute)) { // check if message has been there over a minute
messageCache.Delete(fmt.Sprintf("%s%s", network, which)) // remove old message if expired
ncutils.Log("cached message expired")
return ""
}
ncutils.Log("cache hit, skipping probably " + readMessage.Message)
return readMessage.Message // return current message if not expired
}
return ""
}
// == End Message Caches ==
// Daemon runs netclient daemon from command line
func Daemon() error {
ctx, cancel := context.WithCancel(context.Background())
@ -106,6 +106,12 @@ func SetupMQTT(cfg *config.ClientConfig) mqtt.Client {
ncutils.Log("unable to connect to broker, retrying ...")
if time.Now().After(tperiod) {
ncutils.Log("could not connect to broker, exiting " + cfg.Node.Network + " setup: " + token.Error().Error())
if strings.Contains(token.Error().Error(), "connectex") {
ncutils.PrintLog("connection issue detected.. restarting daemon", 0)
Pull(cfg.Node.Network, true)
daemon.Restart()
os.Exit(2)
}
return client
}
} else {
@ -230,7 +236,8 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
if currentMessage == string(data) {
return
}
insert(newNode.Network, lastNodeUpdate, string(data))
insert(newNode.Network, lastNodeUpdate, string(data)) // store new message in cache
//check if interface name has changed if so delete.
if cfg.Node.Interface != newNode.Interface {
if err = wireguard.RemoveConf(cfg.Node.Interface, true); err != nil {
@ -336,10 +343,9 @@ func UpdatePeers(client mqtt.Client, msg mqtt.Message) {
ncutils.Log("error unmarshalling peer data")
return
}
// see if cache hit, if so skip
// see if cached hit, if so skip
var currentMessage = read(peerUpdate.Network, lastPeerUpdate)
if currentMessage == string(data) {
ncutils.Log("cache hit")
return
}
insert(peerUpdate.Network, lastPeerUpdate, string(data))
@ -567,6 +573,10 @@ func parseNetworkFromTopic(topic string) string {
}
func decryptMsg(cfg *config.ClientConfig, msg []byte) ([]byte, error) {
if len(msg) <= 24 { // make sure message is of appropriate length
return nil, fmt.Errorf("recieved invalid message from broker %s", string(msg))
}
// setup the keys
diskKey, keyErr := auth.RetrieveTrafficKey(cfg.Node.Network)
if keyErr != nil {