configure mq on startup

This commit is contained in:
Abhishek Kondur 2022-09-26 16:57:10 +05:30
parent f509dffabb
commit 698a4be861
3 changed files with 115 additions and 52 deletions

10
main.go
View file

@ -139,6 +139,12 @@ func startControllers() {
logger.Log(0, "error occurred initializing DNS: ", err.Error()) logger.Log(0, "error occurred initializing DNS: ", err.Error())
} }
} }
if servercfg.IsMessageQueueBackend() {
if err := mq.Configure(); err != nil {
logger.FatalLog("failed to configure MQ: ", err.Error())
}
}
//Run Rest Server //Run Rest Server
if servercfg.IsRestBackend() { if servercfg.IsRestBackend() {
if !servercfg.DisableRemoteIPCheck() && servercfg.GetAPIHost() == "127.0.0.1" { if !servercfg.DisableRemoteIPCheck() && servercfg.GetAPIHost() == "127.0.0.1" {
@ -150,7 +156,6 @@ func startControllers() {
waitnetwork.Add(1) waitnetwork.Add(1)
go controller.HandleRESTRequests(&waitnetwork) go controller.HandleRESTRequests(&waitnetwork)
} }
//Run MessageQueue //Run MessageQueue
if servercfg.IsMessageQueueBackend() { if servercfg.IsMessageQueueBackend() {
waitnetwork.Add(1) waitnetwork.Add(1)
@ -169,8 +174,7 @@ func runMessageQueue(wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
brokerHost, secure := servercfg.GetMessageQueueEndpoint() brokerHost, secure := servercfg.GetMessageQueueEndpoint()
logger.Log(0, "connecting to mq broker at", brokerHost, "with TLS?", fmt.Sprintf("%v", secure)) logger.Log(0, "connecting to mq broker at", brokerHost, "with TLS?", fmt.Sprintf("%v", secure))
// update admin password and re-create client mq.SetUpAdminClient()
mq.Configure()
mq.SetupMQTT() mq.SetupMQTT()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
go mq.DynamicSecManager(ctx) go mq.DynamicSecManager(ctx)

View file

@ -2,11 +2,18 @@ package mq
import ( import (
"context" "context"
"crypto/sha512"
"encoding/base64"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"os"
mqtt "github.com/eclipse/paho.mqtt.golang" mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/servercfg"
"golang.org/x/crypto/pbkdf2"
) )
const DynamicSecSubTopic = "$CONTROL/dynamic-security/#" const DynamicSecSubTopic = "$CONTROL/dynamic-security/#"
@ -14,14 +21,14 @@ const DynamicSecPubTopic = "$CONTROL/dynamic-security/v1"
type DynSecActionType string type DynSecActionType string
var mqAdminClient mqtt.Client
var ( var (
CreateClient DynSecActionType = "CREATE_CLIENT" CreateClient DynSecActionType = "CREATE_CLIENT"
DisableClient DynSecActionType = "DISABLE_CLIENT" DisableClient DynSecActionType = "DISABLE_CLIENT"
EnableClient DynSecActionType = "ENABLE_CLIENT" EnableClient DynSecActionType = "ENABLE_CLIENT"
DeleteClient DynSecActionType = "DELETE_CLIENT" DeleteClient DynSecActionType = "DELETE_CLIENT"
CreateAdminClient DynSecActionType = "CREATE_ADMIN_CLIENT" ModifyClient DynSecActionType = "MODIFY_CLIENT"
ModifyClient DynSecActionType = "MODIFY_CLIENT"
DISABLE_EXISTING_ADMINS DynSecActionType = "DISABLE_EXISTING_ADMINS"
) )
var ( var (
@ -32,10 +39,43 @@ var (
) )
var ( var (
mqDynSecAdmin string = "Netmaker-Admin" mqAdminUserName string = "Netmaker-Admin"
adminPassword string = "Netmaker-Admin" mqNetmakerServerUserName string = "Netmaker-Server"
) )
type client struct {
Username string `json:"username"`
TextName string `json:"textName"`
Password string `json:"password"`
Salt string `json:"salt"`
Iterations int `json:"iterations"`
Roles []struct {
Rolename string `json:"rolename"`
} `json:"roles"`
}
type role struct {
Rolename string `json:"rolename"`
Acls []struct {
Acltype string `json:"acltype"`
Topic string `json:"topic"`
Allow bool `json:"allow"`
} `json:"acls"`
}
type defaultAccessAcl struct {
PublishClientSend bool `json:"publishClientSend"`
PublishClientReceive bool `json:"publishClientReceive"`
Subscribe bool `json:"subscribe"`
Unsubscribe bool `json:"unsubscribe"`
}
type dynCnf struct {
Clients []client `json:"clients"`
Roles []role `json:"roles"`
DefaultACLAccess defaultAccessAcl `json:"defaultACLAccess"`
}
type MqDynSecGroup struct { type MqDynSecGroup struct {
Groupname string `json:"groupname"` Groupname string `json:"groupname"`
Priority int `json:"priority"` Priority int `json:"priority"`
@ -77,6 +117,39 @@ type MqDynsecPayload struct {
var DynSecChan = make(chan DynSecAction, 100) var DynSecChan = make(chan DynSecAction, 100)
func encodePasswordToPBKDF2(password string, salt string, iterations int, keyLength int) string {
binaryEncoded := pbkdf2.Key([]byte(password), []byte(salt), iterations, keyLength, sha512.New)
return base64.StdEncoding.EncodeToString(binaryEncoded)
}
func Configure() error {
file := "/root/dynamic-security.json"
b, err := os.ReadFile(file)
if err != nil {
return err
}
c := dynCnf{}
json.Unmarshal(b, &c)
password := servercfg.GetMqAdminPassword()
if password == "" {
return errors.New("MQ admin password not provided")
}
for i, cI := range c.Clients {
if cI.Username == mqAdminUserName || cI.Username == mqNetmakerServerUserName {
salt := logic.RandomString(12)
hashed := encodePasswordToPBKDF2(password, salt, 101, 64)
cI.Password = hashed
cI.Salt = base64.StdEncoding.EncodeToString([]byte(salt))
c.Clients[i] = cI
}
}
data, err := json.MarshalIndent(c, "", " ")
if err != nil {
return err
}
return os.WriteFile(file, data, 0755)
}
func DynamicSecManager(ctx context.Context) { func DynamicSecManager(ctx context.Context) {
defer close(DynSecChan) defer close(DynSecChan)
for { for {

View file

@ -2,12 +2,10 @@ package mq
import ( import (
"context" "context"
"encoding/json"
"time" "time"
mqtt "github.com/eclipse/paho.mqtt.golang" mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/netclient/ncutils" "github.com/gravitl/netmaker/netclient/ncutils"
"github.com/gravitl/netmaker/servercfg" "github.com/gravitl/netmaker/servercfg"
) )
@ -24,29 +22,28 @@ var peer_force_send = 0
var mqclient mqtt.Client var mqclient mqtt.Client
func Configure() { func SetUpAdminClient() {
opts := mqtt.NewClientOptions() opts := mqtt.NewClientOptions()
broker, _ := servercfg.GetMessageQueueEndpoint() setMqOptions(mqAdminUserName, servercfg.GetMqAdminPassword(), opts)
opts.AddBroker(broker) mqAdminClient = mqtt.NewClient(opts)
id := ncutils.MakeRandomString(23) opts.SetOnConnectHandler(func(client mqtt.Client) {
opts.ClientID = id if token := client.Subscribe(DynamicSecSubTopic, 0, mqtt.MessageHandler(watchDynSecTopic)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
opts.SetUsername(mqDynSecAdmin) client.Disconnect(240)
opts.SetPassword(adminPassword) logger.Log(0, "Dynamic security client subscription failed")
opts.SetAutoReconnect(true) }
opts.SetConnectRetry(true)
opts.SetConnectRetryInterval(time.Second << 2) opts.SetOrderMatters(true)
opts.SetKeepAlive(time.Minute) opts.SetResumeSubs(true)
opts.SetWriteTimeout(time.Minute) })
mqclient := mqtt.NewClient(opts)
tperiod := time.Now().Add(10 * time.Second) tperiod := time.Now().Add(10 * time.Second)
for { for {
if token := mqclient.Connect(); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil { if token := mqAdminClient.Connect(); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
logger.Log(2, "unable to connect to broker, retrying ...") logger.Log(2, "Admin: unable to connect to broker, retrying ...")
if time.Now().After(tperiod) { if time.Now().After(tperiod) {
if token.Error() == nil { if token.Error() == nil {
logger.FatalLog("could not connect to broker, token timeout, exiting ...") logger.FatalLog("Admin: could not connect to broker, token timeout, exiting ...")
} else { } else {
logger.FatalLog("could not connect to broker, exiting ...", token.Error().Error()) logger.FatalLog("Admin: could not connect to broker, exiting ...", token.Error().Error())
} }
} }
} else { } else {
@ -54,38 +51,27 @@ func Configure() {
} }
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
} }
newAdminPassword := logic.GenKey()
payload := MqDynsecPayload{
Commands: []MqDynSecCmd{
{
Command: ModifyClientCmd,
Username: mqDynSecAdmin,
Password: newAdminPassword,
},
},
}
d, _ := json.Marshal(payload)
if token := mqclient.Publish(DynamicSecPubTopic, 0, true, d); token.Error() != nil {
logger.FatalLog("failed to modify admin password: ", token.Error().Error())
}
mqclient.Disconnect(2)
adminPassword = newAdminPassword
} }
// SetupMQTT creates a connection to broker and return client func setMqOptions(user, password string, opts *mqtt.ClientOptions) {
func SetupMQTT() {
opts := mqtt.NewClientOptions()
broker, _ := servercfg.GetMessageQueueEndpoint() broker, _ := servercfg.GetMessageQueueEndpoint()
opts.AddBroker(broker) opts.AddBroker(broker)
id := ncutils.MakeRandomString(23) id := ncutils.MakeRandomString(23)
opts.ClientID = id opts.ClientID = id
opts.SetUsername(mqDynSecAdmin) opts.SetUsername(user)
opts.SetPassword(adminPassword) opts.SetPassword(password)
opts.SetAutoReconnect(true) opts.SetAutoReconnect(true)
opts.SetConnectRetry(true) opts.SetConnectRetry(true)
opts.SetConnectRetryInterval(time.Second << 2) opts.SetConnectRetryInterval(time.Second << 2)
opts.SetKeepAlive(time.Minute) opts.SetKeepAlive(time.Minute)
opts.SetWriteTimeout(time.Minute) opts.SetWriteTimeout(time.Minute)
}
// SetupMQTT creates a connection to broker and return client
func SetupMQTT() {
opts := mqtt.NewClientOptions()
setMqOptions(mqNetmakerServerUserName, servercfg.GetMqAdminPassword(), opts)
opts.SetOnConnectHandler(func(client mqtt.Client) { opts.SetOnConnectHandler(func(client mqtt.Client) {
if token := client.Subscribe("ping/#", 2, mqtt.MessageHandler(Ping)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil { if token := client.Subscribe("ping/#", 2, mqtt.MessageHandler(Ping)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
client.Disconnect(240) client.Disconnect(240)