From 0e9c549d1b487b0dbea4a8d92f1648659d135eb7 Mon Sep 17 00:00:00 2001 From: Anish Mukherjee Date: Thu, 16 Mar 2023 16:39:41 +0530 Subject: [PATCH] add server name to common topics --- mq/mq.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/mq/mq.go b/mq/mq.go index b8e0d458..066927de 100644 --- a/mq/mq.go +++ b/mq/mq.go @@ -2,6 +2,7 @@ package mq import ( "context" + "fmt" "log" "time" @@ -53,23 +54,24 @@ func SetupMQTT() { opts := mqtt.NewClientOptions() setMqOptions(servercfg.GetMqUserName(), servercfg.GetMqPassword(), opts) opts.SetOnConnectHandler(func(client mqtt.Client) { - if token := client.Subscribe("ping/#", 2, mqtt.MessageHandler(Ping)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { + serverName := servercfg.GetServer() + if token := client.Subscribe(fmt.Sprintf("ping/%s/#", serverName), 2, mqtt.MessageHandler(Ping)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { client.Disconnect(240) logger.Log(0, "ping subscription failed") } - if token := client.Subscribe("update/#", 0, mqtt.MessageHandler(UpdateNode)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { + if token := client.Subscribe(fmt.Sprintf("update/%s/#", serverName), 0, mqtt.MessageHandler(UpdateNode)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { client.Disconnect(240) logger.Log(0, "node update subscription failed") } - if token := client.Subscribe("host/serverupdate/#", 0, mqtt.MessageHandler(UpdateHost)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { + if token := client.Subscribe(fmt.Sprintf("host/serverupdate/%s/#", serverName), 0, mqtt.MessageHandler(UpdateHost)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { client.Disconnect(240) logger.Log(0, "host update subscription failed") } - if token := client.Subscribe("signal/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { + if token := client.Subscribe(fmt.Sprintf("signal/%s/#", serverName), 0, mqtt.MessageHandler(ClientPeerUpdate)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { client.Disconnect(240) logger.Log(0, "node client subscription failed") } - if token := client.Subscribe("metrics/#", 0, mqtt.MessageHandler(UpdateMetrics)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { + if token := client.Subscribe(fmt.Sprintf("metrics/%s/#", serverName), 0, mqtt.MessageHandler(UpdateMetrics)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { client.Disconnect(240) logger.Log(0, "node metrics subscription failed") }