publish server keepalive and skeleton client subscription handler

This commit is contained in:
Matthew R Kasun 2022-01-26 13:42:52 -05:00
parent 59685731c1
commit 9dedb479d1
3 changed files with 53 additions and 0 deletions

View file

@ -199,9 +199,13 @@ func runMessageQueue(wg *sync.WaitGroup) {
client.Disconnect(240) client.Disconnect(240)
logger.Log(0, "node update subscription failed") logger.Log(0, "node update subscription failed")
} }
//Set Up Keepalive message
ctx, cancel := context.WithCancel(context.Background())
go mq.Keepalive(ctx)
quit := make(chan os.Signal, 1) quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGTERM, os.Interrupt) signal.Notify(quit, syscall.SIGTERM, os.Interrupt)
<-quit <-quit
cancel()
logger.Log(0, "Message Queue shutting down") logger.Log(0, "Message Queue shutting down")
client.Disconnect(250) client.Disconnect(250)
} }

View file

@ -1,10 +1,12 @@
package mq package mq
import ( import (
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"log" "log"
"strings" "strings"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang" mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gravitl/netmaker/database" "github.com/gravitl/netmaker/database"
@ -14,6 +16,9 @@ import (
"github.com/gravitl/netmaker/servercfg" "github.com/gravitl/netmaker/servercfg"
) )
const KEEPALIVE_TIMEOUT = 60 //timeout in seconds
const MQ_DISCONNECT = 250
// DefaultHandler default message queue handler - only called when GetDebug == true // DefaultHandler default message queue handler - only called when GetDebug == true
var DefaultHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { var DefaultHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
logger.Log(0, "MQTT Message: Topic: ", string(msg.Topic()), " Message: ", string(msg.Payload())) logger.Log(0, "MQTT Message: Topic: ", string(msg.Topic()), " Message: ", string(msg.Payload()))
@ -156,3 +161,26 @@ func SetupMQTT() mqtt.Client {
} }
return client return client
} }
// Keepalive -- periodically pings all nodes to let them know server is still alive and doing well
func Keepalive(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Second * KEEPALIVE_TIMEOUT):
nodes, err := logic.GetAllNodes()
if err != nil {
logger.Log(1, "error retrieving nodes for keepalive", err.Error())
}
client := SetupMQTT()
for _, node := range nodes {
if token := client.Publish("serverkeepalive/"+node.ID, 0, false, servercfg.GetVersion()); token.Wait() && token.Error() != nil {
logger.Log(1, "error publishing server keepalive", token.Error().Error())
}
client.Disconnect(MQ_DISCONNECT)
}
logger.Log(2, "keepalive sent to all nodes")
}
}
}

View file

@ -19,6 +19,9 @@ import (
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
) )
// ServerKeepalive - stores time of last server keepalive message
var KeepaliveReceived time.Time
// Daemon runs netclient daemon from command line // Daemon runs netclient daemon from command line
func Daemon() error { func Daemon() error {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -58,6 +61,7 @@ func MessageQueue(ctx context.Context, network string) {
cfg.Network = network cfg.Network = network
cfg.ReadConfig() cfg.ReadConfig()
ncutils.Log("daemon started for network:" + network) ncutils.Log("daemon started for network:" + network)
KeepaliveReceived = time.Now()
client := SetupMQTT(&cfg) client := SetupMQTT(&cfg)
if cfg.DebugOn { if cfg.DebugOn {
if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil { if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
@ -77,6 +81,12 @@ func MessageQueue(ctx context.Context, network string) {
if cfg.DebugOn { if cfg.DebugOn {
ncutils.Log("subscribed to node updates for node " + cfg.Node.Name + " update/peers/" + cfg.Node.ID) ncutils.Log("subscribed to node updates for node " + cfg.Node.Name + " update/peers/" + cfg.Node.ID)
} }
if token := client.Subscribe("serverkeepalive/"+cfg.Node.ID, 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
log.Fatal(token.Error())
}
if cfg.DebugOn {
ncutils.Log("subscribed to server keepalives")
}
defer client.Disconnect(250) defer client.Disconnect(250)
go Checkin(ctx, &cfg, network) go Checkin(ctx, &cfg, network)
<-ctx.Done() <-ctx.Done()
@ -196,6 +206,17 @@ var UpdatePeers mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message)
}() }()
} }
// ServerKeepAlive -- handler to react to keepalive messages published by server
func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) {
if time.Now().Sub(KeepaliveReceived) < time.Second*200 { // more than 3+ minutes
KeepaliveReceived = time.Now()
return
}
ncutils.Log("server keepalive not recieved in last 3 minutes")
///do other stuff
}
// UpdateKeys -- updates private key and returns new publickey // UpdateKeys -- updates private key and returns new publickey
func UpdateKeys(cfg *config.ClientConfig, client mqtt.Client) error { func UpdateKeys(cfg *config.ClientConfig, client mqtt.Client) error {
ncutils.Log("received message to update keys") ncutils.Log("received message to update keys")