Merge pull request #2122 from gravitl/refactor_unique_server_topic

add server name to common topics
This commit is contained in:
dcarns 2023-03-20 16:56:02 -04:00 committed by GitHub
commit e17505979a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -2,6 +2,7 @@ package mq
import (
"context"
"fmt"
"log"
"time"
@ -53,23 +54,24 @@ func SetupMQTT() {
opts := mqtt.NewClientOptions()
setMqOptions(servercfg.GetMqUserName(), servercfg.GetMqPassword(), opts)
opts.SetOnConnectHandler(func(client mqtt.Client) {
if token := client.Subscribe("ping/#", 2, mqtt.MessageHandler(Ping)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
serverName := servercfg.GetServer()
if token := client.Subscribe(fmt.Sprintf("ping/%s/#", serverName), 2, mqtt.MessageHandler(Ping)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
client.Disconnect(240)
logger.Log(0, "ping subscription failed")
}
if token := client.Subscribe("update/#", 0, mqtt.MessageHandler(UpdateNode)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
if token := client.Subscribe(fmt.Sprintf("update/%s/#", serverName), 0, mqtt.MessageHandler(UpdateNode)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
client.Disconnect(240)
logger.Log(0, "node update subscription failed")
}
if token := client.Subscribe("host/serverupdate/#", 0, mqtt.MessageHandler(UpdateHost)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
if token := client.Subscribe(fmt.Sprintf("host/serverupdate/%s/#", serverName), 0, mqtt.MessageHandler(UpdateHost)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
client.Disconnect(240)
logger.Log(0, "host update subscription failed")
}
if token := client.Subscribe("signal/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
if token := client.Subscribe(fmt.Sprintf("signal/%s/#", serverName), 0, mqtt.MessageHandler(ClientPeerUpdate)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
client.Disconnect(240)
logger.Log(0, "node client subscription failed")
}
if token := client.Subscribe("metrics/#", 0, mqtt.MessageHandler(UpdateMetrics)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
if token := client.Subscribe(fmt.Sprintf("metrics/%s/#", serverName), 0, mqtt.MessageHandler(UpdateMetrics)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
client.Disconnect(240)
logger.Log(0, "node metrics subscription failed")
}