From 9dedb479d12bb4f89cd101444c7f72dc1318a29e Mon Sep 17 00:00:00 2001 From: Matthew R Kasun Date: Wed, 26 Jan 2022 13:42:52 -0500 Subject: [PATCH] publish server keepalive and skeleton client subscription handler --- main.go | 4 ++++ mq/mq.go | 28 ++++++++++++++++++++++++++++ netclient/functions/daemon.go | 21 +++++++++++++++++++++ 3 files changed, 53 insertions(+) diff --git a/main.go b/main.go index 9b10d2ec..e7889685 100644 --- a/main.go +++ b/main.go @@ -199,9 +199,13 @@ func runMessageQueue(wg *sync.WaitGroup) { client.Disconnect(240) logger.Log(0, "node update subscription failed") } + //Set Up Keepalive message + ctx, cancel := context.WithCancel(context.Background()) + go mq.Keepalive(ctx) quit := make(chan os.Signal, 1) signal.Notify(quit, syscall.SIGTERM, os.Interrupt) <-quit + cancel() logger.Log(0, "Message Queue shutting down") client.Disconnect(250) } diff --git a/mq/mq.go b/mq/mq.go index a31d8254..5da6cc13 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -1,10 +1,12 @@ package mq import ( + "context" "encoding/json" "errors" "log" "strings" + "time" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/gravitl/netmaker/database" @@ -14,6 +16,9 @@ import ( "github.com/gravitl/netmaker/servercfg" ) +const KEEPALIVE_TIMEOUT = 60 //timeout in seconds +const MQ_DISCONNECT = 250 + // DefaultHandler default message queue handler - only called when GetDebug == true var DefaultHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { logger.Log(0, "MQTT Message: Topic: ", string(msg.Topic()), " Message: ", string(msg.Payload())) @@ -156,3 +161,26 @@ func SetupMQTT() mqtt.Client { } return client } + +// Keepalive -- periodically pings all nodes to let them know server is still alive and doing well +func Keepalive(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Second * KEEPALIVE_TIMEOUT): + nodes, err := logic.GetAllNodes() + if err != nil { + logger.Log(1, "error retrieving nodes for keepalive", err.Error()) + } + client := SetupMQTT() + for _, node := range nodes { + if token := client.Publish("serverkeepalive/"+node.ID, 0, false, servercfg.GetVersion()); token.Wait() && token.Error() != nil { + logger.Log(1, "error publishing server keepalive", token.Error().Error()) + } + client.Disconnect(MQ_DISCONNECT) + } + logger.Log(2, "keepalive sent to all nodes") + } + } +} diff --git a/netclient/functions/daemon.go b/netclient/functions/daemon.go index 1fbc51f7..3cb27843 100644 --- a/netclient/functions/daemon.go +++ b/netclient/functions/daemon.go @@ -19,6 +19,9 @@ import ( "golang.zx2c4.com/wireguard/wgctrl/wgtypes" ) +// ServerKeepalive - stores time of last server keepalive message +var KeepaliveReceived time.Time + // Daemon runs netclient daemon from command line func Daemon() error { ctx, cancel := context.WithCancel(context.Background()) @@ -58,6 +61,7 @@ func MessageQueue(ctx context.Context, network string) { cfg.Network = network cfg.ReadConfig() ncutils.Log("daemon started for network:" + network) + KeepaliveReceived = time.Now() client := SetupMQTT(&cfg) if cfg.DebugOn { if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil { @@ -77,6 +81,12 @@ func MessageQueue(ctx context.Context, network string) { if cfg.DebugOn { ncutils.Log("subscribed to node updates for node " + cfg.Node.Name + " update/peers/" + cfg.Node.ID) } + if token := client.Subscribe("serverkeepalive/"+cfg.Node.ID, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil { + log.Fatal(token.Error()) + } + if cfg.DebugOn { + ncutils.Log("subscribed to server keepalives") + } defer client.Disconnect(250) go Checkin(ctx, &cfg, network) <-ctx.Done() @@ -196,6 +206,17 @@ var UpdatePeers mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) }() } +// ServerKeepAlive -- handler to react to keepalive messages published by server +func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) { + if time.Now().Sub(KeepaliveReceived) < time.Second*200 { // more than 3+ minutes + + KeepaliveReceived = time.Now() + return + } + ncutils.Log("server keepalive not recieved in last 3 minutes") + ///do other stuff +} + // UpdateKeys -- updates private key and returns new publickey func UpdateKeys(cfg *config.ClientConfig, client mqtt.Client) error { ncutils.Log("received message to update keys")