diff --git a/main.go b/main.go index 778f9a09..289f9cdf 100644 --- a/main.go +++ b/main.go @@ -169,6 +169,8 @@ func runMessageQueue(wg *sync.WaitGroup) { defer wg.Done() brokerHost, secure := servercfg.GetMessageQueueEndpoint() logger.Log(0, "connecting to mq broker at", brokerHost, "with TLS?", fmt.Sprintf("%v", secure)) + // update admin password and re-create client + mq.Configure() mq.SetupMQTT() ctx, cancel := context.WithCancel(context.Background()) go mq.DynamicSecManager(ctx) diff --git a/mq/dynsec.go b/mq/dynsec.go index 5f169537..4f9d75ab 100644 --- a/mq/dynsec.go +++ b/mq/dynsec.go @@ -31,8 +31,10 @@ var ( ModifyClientCmd = "modifyClient" ) -const mqDynSecAdmin = "Netmaker-Admin" -const defaultAdminPassword = "Netmaker-Admin" +var ( + mqDynSecAdmin string = "Netmaker-Admin" + adminPassword string = "Netmaker-Admin" +) type MqDynSecGroup struct { Groupname string `json:"groupname"` @@ -76,7 +78,7 @@ type MqDynsecPayload struct { var DynSecChan = make(chan DynSecAction, 100) func DynamicSecManager(ctx context.Context) { - + defer close(DynSecChan) for { select { case <-ctx.Done(): diff --git a/mq/mq.go b/mq/mq.go index c865ad76..c1a1d904 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -2,10 +2,12 @@ package mq import ( "context" + "encoding/json" "time" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/gravitl/netmaker/logger" + "github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/netclient/ncutils" "github.com/gravitl/netmaker/servercfg" ) @@ -22,6 +24,53 @@ var peer_force_send = 0 var mqclient mqtt.Client +func Configure() { + opts := mqtt.NewClientOptions() + broker, _ := servercfg.GetMessageQueueEndpoint() + opts.AddBroker(broker) + id := ncutils.MakeRandomString(23) + opts.ClientID = id + opts.SetUsername(mqDynSecAdmin) + opts.SetPassword(adminPassword) + opts.SetAutoReconnect(true) + opts.SetConnectRetry(true) + opts.SetConnectRetryInterval(time.Second << 2) + opts.SetKeepAlive(time.Minute) + opts.SetWriteTimeout(time.Minute) + mqclient := mqtt.NewClient(opts) + tperiod := time.Now().Add(10 * time.Second) + for { + if token := mqclient.Connect(); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil { + logger.Log(2, "unable to connect to broker, retrying ...") + if time.Now().After(tperiod) { + if token.Error() == nil { + logger.FatalLog("could not connect to broker, token timeout, exiting ...") + } else { + logger.FatalLog("could not connect to broker, exiting ...", token.Error().Error()) + } + } + } else { + break + } + time.Sleep(2 * time.Second) + } + newAdminPassword := logic.GenKey() + payload := MqDynsecPayload{ + Commands: []MqDynSecCmd{ + { + Command: ModifyClientCmd, + Username: mqDynSecAdmin, + Password: newAdminPassword, + }, + }, + } + d, _ := json.Marshal(payload) + if token := mqclient.Publish(DynamicSecPubTopic, 0, true, d); token.Error() != nil { + logger.FatalLog("failed to modify admin password: ", token.Error().Error()) + } + adminPassword = newAdminPassword +} + // SetupMQTT creates a connection to broker and return client func SetupMQTT() { opts := mqtt.NewClientOptions() @@ -30,7 +79,7 @@ func SetupMQTT() { id := ncutils.MakeRandomString(23) opts.ClientID = id opts.SetUsername(mqDynSecAdmin) - opts.SetPassword(defaultAdminPassword) + opts.SetPassword(adminPassword) opts.SetAutoReconnect(true) opts.SetConnectRetry(true) opts.SetConnectRetryInterval(time.Second << 2)