From 49c6380643459bb41d7d566e66d765eaac3312d2 Mon Sep 17 00:00:00 2001 From: "Matthew R. Kasun" Date: Mon, 29 Aug 2022 14:08:01 -0400 Subject: [PATCH] reuse mq connections --- main.go | 3 +-- mq/mq.go | 42 +++++++++++++++----------------- mq/util.go | 9 ++++--- netclient/functions/daemon.go | 41 ++++++++++++++++--------------- netclient/functions/join.go | 5 ---- netclient/functions/localport.go | 2 +- netclient/functions/mqpublish.go | 14 +++++------ 7 files changed, 53 insertions(+), 63 deletions(-) diff --git a/main.go b/main.go index adda6c96..f79e259d 100644 --- a/main.go +++ b/main.go @@ -171,7 +171,7 @@ func runMessageQueue(wg *sync.WaitGroup) { defer wg.Done() brokerHost, secure := servercfg.GetMessageQueueEndpoint() logger.Log(0, "connecting to mq broker at", brokerHost, "with TLS?", fmt.Sprintf("%v", secure)) - var client = mq.SetupMQTT(false) // Set up the subscription listener + mq.SetupMQTT() ctx, cancel := context.WithCancel(context.Background()) go mq.Keepalive(ctx) go logic.ManageZombies(ctx) @@ -180,7 +180,6 @@ func runMessageQueue(wg *sync.WaitGroup) { <-quit cancel() logger.Log(0, "Message Queue shutting down") - client.Disconnect(250) } func setVerbosity() { diff --git a/mq/mq.go b/mq/mq.go index 4b83fb83..dcc0a861 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -21,8 +21,10 @@ const MQ_TIMEOUT = 30 var peer_force_send = 0 +var mqclient mqtt.Client + // SetupMQTT creates a connection to broker and return client -func SetupMQTT(publish bool) mqtt.Client { +func SetupMQTT() { opts := mqtt.NewClientOptions() broker, secure := servercfg.GetMessageQueueEndpoint() opts.AddBroker(broker) @@ -37,28 +39,26 @@ func SetupMQTT(publish bool) mqtt.Client { opts.SetKeepAlive(time.Minute) opts.SetWriteTimeout(time.Minute) opts.SetOnConnectHandler(func(client mqtt.Client) { - if !publish { - if token := client.Subscribe("ping/#", 2, mqtt.MessageHandler(Ping)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { - client.Disconnect(240) - logger.Log(0, "ping subscription failed") - } - if token := client.Subscribe("update/#", 0, mqtt.MessageHandler(UpdateNode)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { - client.Disconnect(240) - logger.Log(0, "node update subscription failed") - } - if token := client.Subscribe("signal/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { - client.Disconnect(240) - logger.Log(0, "node client subscription failed") - } - - opts.SetOrderMatters(true) - opts.SetResumeSubs(true) + if token := client.Subscribe("ping/#", 2, mqtt.MessageHandler(Ping)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { + client.Disconnect(240) + logger.Log(0, "ping subscription failed") } + if token := client.Subscribe("update/#", 0, mqtt.MessageHandler(UpdateNode)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { + client.Disconnect(240) + logger.Log(0, "node update subscription failed") + } + if token := client.Subscribe("signal/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { + client.Disconnect(240) + logger.Log(0, "node client subscription failed") + } + + opts.SetOrderMatters(true) + opts.SetResumeSubs(true) }) - client := mqtt.NewClient(opts) + mqclient := mqtt.NewClient(opts) tperiod := time.Now().Add(10 * time.Second) for { - if token := client.Connect(); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil { + if token := mqclient.Connect(); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil { logger.Log(2, "unable to connect to broker, retrying ...") if time.Now().After(tperiod) { if token.Error() == nil { @@ -72,10 +72,6 @@ func SetupMQTT(publish bool) mqtt.Client { } time.Sleep(2 * time.Second) } - if !publish { - logger.Log(0, "successfully connected to mq broker") - } - return client } // Keepalive -- periodically pings all nodes to let them know server is still alive and doing well diff --git a/mq/util.go b/mq/util.go index a44e955b..e268b82f 100644 --- a/mq/util.go +++ b/mq/util.go @@ -61,13 +61,14 @@ func encryptMsg(node *models.Node, msg []byte) ([]byte, error) { } func publish(node *models.Node, dest string, msg []byte) error { - client := SetupMQTT(true) - defer client.Disconnect(250) encrypted, encryptErr := encryptMsg(node, msg) if encryptErr != nil { return encryptErr } - if token := client.Publish(dest, 0, true, encrypted); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil { + if mqclient == nil { + return errors.New("cannot publish ... mqclient not connected") + } + if token := mqclient.Publish(dest, 0, true, encrypted); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil { var err error if token.Error() == nil { err = errors.New("connection timeout") @@ -79,7 +80,7 @@ func publish(node *models.Node, dest string, msg []byte) error { return nil } -// decodes a message queue topic and returns the embedded node.ID +// decodes a message queue topic and returns the embedded node.ID func getID(topic string) (string, error) { parts := strings.Split(topic, "/") count := len(parts) diff --git a/netclient/functions/daemon.go b/netclient/functions/daemon.go index a454d9e9..8ae6624c 100644 --- a/netclient/functions/daemon.go +++ b/netclient/functions/daemon.go @@ -34,6 +34,8 @@ var messageCache = new(sync.Map) var serverSet map[string]bool +var mqclient mqtt.Client + const lastNodeUpdate = "lnu" const lastPeerUpdate = "lpu" @@ -192,12 +194,12 @@ func unsubscribeNode(client mqtt.Client, nodeCfg *config.ClientConfig) { func messageQueue(ctx context.Context, wg *sync.WaitGroup, cfg *config.ClientConfig) { defer wg.Done() logger.Log(0, "network:", cfg.Node.Network, "netclient message queue started for server:", cfg.Server.Server) - client, err := setupMQTT(cfg, false) + err := setupMQTT(cfg) if err != nil { logger.Log(0, "unable to connect to broker", cfg.Server.Server, err.Error()) return } - defer client.Disconnect(250) + //defer mqclient.Disconnect(250) <-ctx.Done() logger.Log(0, "shutting down message queue for server", cfg.Server.Server) } @@ -232,7 +234,7 @@ func NewTLSConfig(server string) (*tls.Config, error) { // setupMQTT creates a connection to broker and returns client // this function is primarily used to create a connection to publish to the broker -func setupMQTT(cfg *config.ClientConfig, publish bool) (mqtt.Client, error) { +func setupMQTT(cfg *config.ClientConfig) error { opts := mqtt.NewClientOptions() server := cfg.Server.Server port := cfg.Server.MQPort @@ -240,7 +242,7 @@ func setupMQTT(cfg *config.ClientConfig, publish bool) (mqtt.Client, error) { tlsConfig, err := NewTLSConfig(server) if err != nil { logger.Log(0, "failed to get TLS config for", server, err.Error()) - return nil, err + return err } opts.SetTLSConfig(tlsConfig) opts.SetClientID(ncutils.MakeRandomString(23)) @@ -252,17 +254,15 @@ func setupMQTT(cfg *config.ClientConfig, publish bool) (mqtt.Client, error) { opts.SetWriteTimeout(time.Minute) opts.SetOnConnectHandler(func(client mqtt.Client) { - if !publish { - networks, err := ncutils.GetSystemNetworks() - if err != nil { - logger.Log(0, "error retriving networks", err.Error()) - } - for _, network := range networks { - var currNodeCfg config.ClientConfig - currNodeCfg.Network = network - currNodeCfg.ReadConfig() - setSubscriptions(client, &currNodeCfg) - } + networks, err := ncutils.GetSystemNetworks() + if err != nil { + logger.Log(0, "error retriving networks", err.Error()) + } + for _, network := range networks { + var currNodeCfg config.ClientConfig + currNodeCfg.Network = network + currNodeCfg.ReadConfig() + setSubscriptions(client, &currNodeCfg) } }) opts.SetOrderMatters(true) @@ -270,11 +270,12 @@ func setupMQTT(cfg *config.ClientConfig, publish bool) (mqtt.Client, error) { opts.SetConnectionLostHandler(func(c mqtt.Client, e error) { logger.Log(0, "network:", cfg.Node.Network, "detected broker connection lost for", cfg.Server.Server) }) - client := mqtt.NewClient(opts) + mqclient = mqtt.NewClient(opts) + log.Println(mqclient) var connecterr error for count := 0; count < 3; count++ { connecterr = nil - if token := client.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil { + if token := mqclient.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil { logger.Log(0, "unable to connect to broker, retrying ...") if token.Error() == nil { connecterr = errors.New("connect timeout") @@ -289,12 +290,12 @@ func setupMQTT(cfg *config.ClientConfig, publish bool) (mqtt.Client, error) { if connecterr != nil { reRegisterWithServer(cfg) //try after re-registering - if token := client.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil { - return client, errors.New("unable to connect to broker") + if token := mqclient.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil { + return errors.New("unable to connect to broker") } } - return client, nil + return nil } func reRegisterWithServer(cfg *config.ClientConfig) { diff --git a/netclient/functions/join.go b/netclient/functions/join.go index cedff21b..8ab6470e 100644 --- a/netclient/functions/join.go +++ b/netclient/functions/join.go @@ -218,11 +218,6 @@ func JoinNetwork(cfg *config.ClientConfig, privateKey string) error { if cfg.Server.Server == "" { return errors.New("did not receive broker address from registration") } - // update server with latest data - if err := PublishNodeUpdate(cfg); err != nil { - logger.Log(0, "network:", cfg.Network, "failed to publish update for join", err.Error()) - } - if cfg.Daemon == "install" || ncutils.IsFreeBSD() { err = daemon.InstallDaemon() if err != nil { diff --git a/netclient/functions/localport.go b/netclient/functions/localport.go index d6290534..a628d380 100644 --- a/netclient/functions/localport.go +++ b/netclient/functions/localport.go @@ -44,7 +44,7 @@ func UpdateLocalListenPort(nodeCfg *config.ClientConfig) error { return err } if err := PublishNodeUpdate(nodeCfg); err != nil { - logger.Log(0, "could not publish local port change") + logger.Log(0, "could not publish local port change", err.Error()) } } return err diff --git a/netclient/functions/mqpublish.go b/netclient/functions/mqpublish.go index 23788dfb..9a37d564 100644 --- a/netclient/functions/mqpublish.go +++ b/netclient/functions/mqpublish.go @@ -21,7 +21,8 @@ import ( ) // Checkin -- go routine that checks for public or local ip changes, publishes changes -// if there are no updates, simply "pings" the server as a checkin +// +// if there are no updates, simply "pings" the server as a checkin func Checkin(ctx context.Context, wg *sync.WaitGroup) { logger.Log(2, "starting checkin goroutine") defer wg.Done() @@ -141,17 +142,14 @@ func publish(nodeCfg *config.ClientConfig, dest string, msg []byte, qos byte) er return err } - client, err := setupMQTT(nodeCfg, true) - if err != nil { - return fmt.Errorf("mq setup error %w", err) - } - defer client.Disconnect(250) encrypted, err := ncutils.Chunk(msg, serverPubKey, trafficPrivKey) if err != nil { return err } - - if token := client.Publish(dest, qos, false, encrypted); !token.WaitTimeout(30*time.Second) || token.Error() != nil { + if mqclient == nil { + return errors.New("unable to publish ... no mqclient") + } + if token := mqclient.Publish(dest, qos, false, encrypted); !token.WaitTimeout(30*time.Second) || token.Error() != nil { logger.Log(0, "could not connect to broker at "+nodeCfg.Server.Server+":"+nodeCfg.Server.MQPort) var err error if token.Error() == nil {