mirror of
https://github.com/gravitl/netmaker.git
synced 2025-09-18 02:54:31 +08:00
Merge pull request #1495 from gravitl/feature_v0.15.1_reuse_mq_connection
Feature v0.15.1 reuse mq connection
This commit is contained in:
commit
2fb0056ce5
7 changed files with 52 additions and 63 deletions
3
main.go
3
main.go
|
@ -171,7 +171,7 @@ func runMessageQueue(wg *sync.WaitGroup) {
|
|||
defer wg.Done()
|
||||
brokerHost, secure := servercfg.GetMessageQueueEndpoint()
|
||||
logger.Log(0, "connecting to mq broker at", brokerHost, "with TLS?", fmt.Sprintf("%v", secure))
|
||||
var client = mq.SetupMQTT(false) // Set up the subscription listener
|
||||
mq.SetupMQTT()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go mq.Keepalive(ctx)
|
||||
go logic.ManageZombies(ctx)
|
||||
|
@ -180,7 +180,6 @@ func runMessageQueue(wg *sync.WaitGroup) {
|
|||
<-quit
|
||||
cancel()
|
||||
logger.Log(0, "Message Queue shutting down")
|
||||
client.Disconnect(250)
|
||||
}
|
||||
|
||||
func setVerbosity() {
|
||||
|
|
42
mq/mq.go
42
mq/mq.go
|
@ -21,8 +21,10 @@ const MQ_TIMEOUT = 30
|
|||
|
||||
var peer_force_send = 0
|
||||
|
||||
var mqclient mqtt.Client
|
||||
|
||||
// SetupMQTT creates a connection to broker and return client
|
||||
func SetupMQTT(publish bool) mqtt.Client {
|
||||
func SetupMQTT() {
|
||||
opts := mqtt.NewClientOptions()
|
||||
broker, secure := servercfg.GetMessageQueueEndpoint()
|
||||
opts.AddBroker(broker)
|
||||
|
@ -37,28 +39,26 @@ func SetupMQTT(publish bool) mqtt.Client {
|
|||
opts.SetKeepAlive(time.Minute)
|
||||
opts.SetWriteTimeout(time.Minute)
|
||||
opts.SetOnConnectHandler(func(client mqtt.Client) {
|
||||
if !publish {
|
||||
if token := client.Subscribe("ping/#", 2, mqtt.MessageHandler(Ping)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
|
||||
client.Disconnect(240)
|
||||
logger.Log(0, "ping subscription failed")
|
||||
}
|
||||
if token := client.Subscribe("update/#", 0, mqtt.MessageHandler(UpdateNode)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
|
||||
client.Disconnect(240)
|
||||
logger.Log(0, "node update subscription failed")
|
||||
}
|
||||
if token := client.Subscribe("signal/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
|
||||
client.Disconnect(240)
|
||||
logger.Log(0, "node client subscription failed")
|
||||
}
|
||||
|
||||
opts.SetOrderMatters(true)
|
||||
opts.SetResumeSubs(true)
|
||||
if token := client.Subscribe("ping/#", 2, mqtt.MessageHandler(Ping)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
|
||||
client.Disconnect(240)
|
||||
logger.Log(0, "ping subscription failed")
|
||||
}
|
||||
if token := client.Subscribe("update/#", 0, mqtt.MessageHandler(UpdateNode)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
|
||||
client.Disconnect(240)
|
||||
logger.Log(0, "node update subscription failed")
|
||||
}
|
||||
if token := client.Subscribe("signal/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
|
||||
client.Disconnect(240)
|
||||
logger.Log(0, "node client subscription failed")
|
||||
}
|
||||
|
||||
opts.SetOrderMatters(true)
|
||||
opts.SetResumeSubs(true)
|
||||
})
|
||||
client := mqtt.NewClient(opts)
|
||||
mqclient = mqtt.NewClient(opts)
|
||||
tperiod := time.Now().Add(10 * time.Second)
|
||||
for {
|
||||
if token := client.Connect(); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
|
||||
if token := mqclient.Connect(); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
|
||||
logger.Log(2, "unable to connect to broker, retrying ...")
|
||||
if time.Now().After(tperiod) {
|
||||
if token.Error() == nil {
|
||||
|
@ -72,10 +72,6 @@ func SetupMQTT(publish bool) mqtt.Client {
|
|||
}
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
if !publish {
|
||||
logger.Log(0, "successfully connected to mq broker")
|
||||
}
|
||||
return client
|
||||
}
|
||||
|
||||
// Keepalive -- periodically pings all nodes to let them know server is still alive and doing well
|
||||
|
|
|
@ -61,13 +61,14 @@ func encryptMsg(node *models.Node, msg []byte) ([]byte, error) {
|
|||
}
|
||||
|
||||
func publish(node *models.Node, dest string, msg []byte) error {
|
||||
client := SetupMQTT(true)
|
||||
defer client.Disconnect(250)
|
||||
encrypted, encryptErr := encryptMsg(node, msg)
|
||||
if encryptErr != nil {
|
||||
return encryptErr
|
||||
}
|
||||
if token := client.Publish(dest, 0, true, encrypted); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
|
||||
if mqclient == nil {
|
||||
return errors.New("cannot publish ... mqclient not connected")
|
||||
}
|
||||
if token := mqclient.Publish(dest, 0, true, encrypted); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
|
||||
var err error
|
||||
if token.Error() == nil {
|
||||
err = errors.New("connection timeout")
|
||||
|
@ -79,7 +80,7 @@ func publish(node *models.Node, dest string, msg []byte) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// decodes a message queue topic and returns the embedded node.ID
|
||||
// decodes a message queue topic and returns the embedded node.ID
|
||||
func getID(topic string) (string, error) {
|
||||
parts := strings.Split(topic, "/")
|
||||
count := len(parts)
|
||||
|
|
|
@ -34,6 +34,8 @@ var messageCache = new(sync.Map)
|
|||
|
||||
var serverSet map[string]bool
|
||||
|
||||
var mqclient mqtt.Client
|
||||
|
||||
const lastNodeUpdate = "lnu"
|
||||
const lastPeerUpdate = "lpu"
|
||||
|
||||
|
@ -192,12 +194,12 @@ func unsubscribeNode(client mqtt.Client, nodeCfg *config.ClientConfig) {
|
|||
func messageQueue(ctx context.Context, wg *sync.WaitGroup, cfg *config.ClientConfig) {
|
||||
defer wg.Done()
|
||||
logger.Log(0, "network:", cfg.Node.Network, "netclient message queue started for server:", cfg.Server.Server)
|
||||
client, err := setupMQTT(cfg, false)
|
||||
err := setupMQTT(cfg)
|
||||
if err != nil {
|
||||
logger.Log(0, "unable to connect to broker", cfg.Server.Server, err.Error())
|
||||
return
|
||||
}
|
||||
defer client.Disconnect(250)
|
||||
//defer mqclient.Disconnect(250)
|
||||
<-ctx.Done()
|
||||
logger.Log(0, "shutting down message queue for server", cfg.Server.Server)
|
||||
}
|
||||
|
@ -232,7 +234,7 @@ func NewTLSConfig(server string) (*tls.Config, error) {
|
|||
|
||||
// setupMQTT creates a connection to broker and returns client
|
||||
// this function is primarily used to create a connection to publish to the broker
|
||||
func setupMQTT(cfg *config.ClientConfig, publish bool) (mqtt.Client, error) {
|
||||
func setupMQTT(cfg *config.ClientConfig) error {
|
||||
opts := mqtt.NewClientOptions()
|
||||
server := cfg.Server.Server
|
||||
port := cfg.Server.MQPort
|
||||
|
@ -240,7 +242,7 @@ func setupMQTT(cfg *config.ClientConfig, publish bool) (mqtt.Client, error) {
|
|||
tlsConfig, err := NewTLSConfig(server)
|
||||
if err != nil {
|
||||
logger.Log(0, "failed to get TLS config for", server, err.Error())
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
opts.SetTLSConfig(tlsConfig)
|
||||
opts.SetClientID(ncutils.MakeRandomString(23))
|
||||
|
@ -252,17 +254,15 @@ func setupMQTT(cfg *config.ClientConfig, publish bool) (mqtt.Client, error) {
|
|||
opts.SetWriteTimeout(time.Minute)
|
||||
|
||||
opts.SetOnConnectHandler(func(client mqtt.Client) {
|
||||
if !publish {
|
||||
networks, err := ncutils.GetSystemNetworks()
|
||||
if err != nil {
|
||||
logger.Log(0, "error retriving networks", err.Error())
|
||||
}
|
||||
for _, network := range networks {
|
||||
var currNodeCfg config.ClientConfig
|
||||
currNodeCfg.Network = network
|
||||
currNodeCfg.ReadConfig()
|
||||
setSubscriptions(client, &currNodeCfg)
|
||||
}
|
||||
networks, err := ncutils.GetSystemNetworks()
|
||||
if err != nil {
|
||||
logger.Log(0, "error retriving networks", err.Error())
|
||||
}
|
||||
for _, network := range networks {
|
||||
var currNodeCfg config.ClientConfig
|
||||
currNodeCfg.Network = network
|
||||
currNodeCfg.ReadConfig()
|
||||
setSubscriptions(client, &currNodeCfg)
|
||||
}
|
||||
})
|
||||
opts.SetOrderMatters(true)
|
||||
|
@ -270,11 +270,11 @@ func setupMQTT(cfg *config.ClientConfig, publish bool) (mqtt.Client, error) {
|
|||
opts.SetConnectionLostHandler(func(c mqtt.Client, e error) {
|
||||
logger.Log(0, "network:", cfg.Node.Network, "detected broker connection lost for", cfg.Server.Server)
|
||||
})
|
||||
client := mqtt.NewClient(opts)
|
||||
mqclient = mqtt.NewClient(opts)
|
||||
var connecterr error
|
||||
for count := 0; count < 3; count++ {
|
||||
connecterr = nil
|
||||
if token := client.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil {
|
||||
if token := mqclient.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil {
|
||||
logger.Log(0, "unable to connect to broker, retrying ...")
|
||||
if token.Error() == nil {
|
||||
connecterr = errors.New("connect timeout")
|
||||
|
@ -289,12 +289,12 @@ func setupMQTT(cfg *config.ClientConfig, publish bool) (mqtt.Client, error) {
|
|||
if connecterr != nil {
|
||||
reRegisterWithServer(cfg)
|
||||
//try after re-registering
|
||||
if token := client.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil {
|
||||
return client, errors.New("unable to connect to broker")
|
||||
if token := mqclient.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil {
|
||||
return errors.New("unable to connect to broker")
|
||||
}
|
||||
}
|
||||
|
||||
return client, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func reRegisterWithServer(cfg *config.ClientConfig) {
|
||||
|
|
|
@ -218,11 +218,6 @@ func JoinNetwork(cfg *config.ClientConfig, privateKey string) error {
|
|||
if cfg.Server.Server == "" {
|
||||
return errors.New("did not receive broker address from registration")
|
||||
}
|
||||
// update server with latest data
|
||||
if err := PublishNodeUpdate(cfg); err != nil {
|
||||
logger.Log(0, "network:", cfg.Network, "failed to publish update for join", err.Error())
|
||||
}
|
||||
|
||||
if cfg.Daemon == "install" || ncutils.IsFreeBSD() {
|
||||
err = daemon.InstallDaemon()
|
||||
if err != nil {
|
||||
|
|
|
@ -44,7 +44,7 @@ func UpdateLocalListenPort(nodeCfg *config.ClientConfig) error {
|
|||
return err
|
||||
}
|
||||
if err := PublishNodeUpdate(nodeCfg); err != nil {
|
||||
logger.Log(0, "could not publish local port change")
|
||||
logger.Log(0, "could not publish local port change", err.Error())
|
||||
}
|
||||
}
|
||||
return err
|
||||
|
|
|
@ -21,7 +21,8 @@ import (
|
|||
)
|
||||
|
||||
// Checkin -- go routine that checks for public or local ip changes, publishes changes
|
||||
// if there are no updates, simply "pings" the server as a checkin
|
||||
//
|
||||
// if there are no updates, simply "pings" the server as a checkin
|
||||
func Checkin(ctx context.Context, wg *sync.WaitGroup) {
|
||||
logger.Log(2, "starting checkin goroutine")
|
||||
defer wg.Done()
|
||||
|
@ -141,17 +142,14 @@ func publish(nodeCfg *config.ClientConfig, dest string, msg []byte, qos byte) er
|
|||
return err
|
||||
}
|
||||
|
||||
client, err := setupMQTT(nodeCfg, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("mq setup error %w", err)
|
||||
}
|
||||
defer client.Disconnect(250)
|
||||
encrypted, err := ncutils.Chunk(msg, serverPubKey, trafficPrivKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if token := client.Publish(dest, qos, false, encrypted); !token.WaitTimeout(30*time.Second) || token.Error() != nil {
|
||||
if mqclient == nil {
|
||||
return errors.New("unable to publish ... no mqclient")
|
||||
}
|
||||
if token := mqclient.Publish(dest, qos, false, encrypted); !token.WaitTimeout(30*time.Second) || token.Error() != nil {
|
||||
logger.Log(0, "could not connect to broker at "+nodeCfg.Server.Server+":"+nodeCfg.Server.MQPort)
|
||||
var err error
|
||||
if token.Error() == nil {
|
||||
|
|
Loading…
Add table
Reference in a new issue