check broker connectivity on mqtt setup

This commit is contained in:
Matthew R. Kasun 2022-05-20 10:50:11 -04:00 committed by 0xdcarns
parent 5f0da691f3
commit 4b72a4e289
2 changed files with 19 additions and 10 deletions

View file

@ -167,7 +167,11 @@ func unsubscribeNode(client mqtt.Client, nodeCfg *config.ClientConfig) {
// the client should subscribe to ALL nodes that exist on server locally
func messageQueue(ctx context.Context, cfg *config.ClientConfig) {
logger.Log(0, "netclient daemon started for server: ", cfg.Server.Server)
client := setupMQTT(cfg, false)
client, err := setupMQTT(cfg, false)
if err != nil {
logger.Log(0, "unable to connect to broker", err.Error())
return
}
defer client.Disconnect(250)
<-ctx.Done()
logger.Log(0, "shutting down daemon for server ", cfg.Server.Server)
@ -201,7 +205,7 @@ func NewTLSConfig(server string) *tls.Config {
// setupMQTT creates a connection to broker and returns client
// this function is primarily used to create a connection to publish to the broker
func setupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client {
func setupMQTT(cfg *config.ClientConfig, publish bool) (mqtt.Client, error) {
opts := mqtt.NewClientOptions()
server := cfg.Server.Server
opts.AddBroker("ssl://" + server + ":8883") // TODO get the appropriate port of the comms mq server
@ -242,7 +246,9 @@ func setupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client {
} else {
err = token.Error()
}
checkBroker(cfg.Server.Server)
if err := checkBroker(cfg.Server.Server); err != nil {
return nil, err
}
logger.Log(0, "could not connect to broker", cfg.Server.Server, err.Error())
if strings.Contains(err.Error(), "connectex") || strings.Contains(err.Error(), "connect timeout") {
logger.Log(0, "connection issue detected.. attempt connection with new certs")
@ -257,7 +263,7 @@ func setupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client {
daemon.Restart()
}
}
return client
return client, nil
}
// publishes a message to server to update peers on this peer's behalf

View file

@ -111,7 +111,7 @@ func Hello(nodeCfg *config.ClientConfig) {
logger.Log(0, "could not run pull on "+nodeCfg.Node.Network+", error: "+err.Error())
}
}
logger.Log(3, "server checkin complete")
logger.Log(3, "checkin for", nodeCfg.Network, "complete")
}
// node cfg is required in order to fetch the traffic keys of that node for encryption
@ -126,7 +126,10 @@ func publish(nodeCfg *config.ClientConfig, dest string, msg []byte, qos byte) er
return err
}
client := setupMQTT(nodeCfg, true)
client, err := setupMQTT(nodeCfg, true)
if err != nil {
return fmt.Errorf("mq setup error %w", err)
}
defer client.Disconnect(250)
encrypted, err := ncutils.Chunk(msg, serverPubKey, trafficPrivKey)
if err != nil {
@ -142,7 +145,6 @@ func publish(nodeCfg *config.ClientConfig, dest string, msg []byte, qos byte) er
err = token.Error()
}
if err != nil {
checkBroker(nodeCfg.Server.Server)
return err
}
}
@ -165,10 +167,10 @@ func checkCertExpiry(cfg *config.ClientConfig) error {
return nil
}
func checkBroker(broker string) {
func checkBroker(broker string) error {
_, err := net.LookupIP(broker)
if err != nil {
logger.FatalLog("nslookup failed for broker ... check dns records")
return errors.New("nslookup failed for broker ... check dns records")
}
pinger := ping.NewTCPing()
pinger.SetTarget(&ping.Target{
@ -182,6 +184,7 @@ func checkBroker(broker string) {
pingerDone := pinger.Start()
<-pingerDone
if pinger.Result().SuccessCounter == 0 {
logger.FatalLog("unable to connect to broker port ... check firewalls")
return errors.New("unable to connect to broker port ... check netmaker server and firewalls")
}
return nil
}