mirror of
https://github.com/gravitl/netmaker.git
synced 2025-09-06 13:14:24 +08:00
rm mq dynamic security, add mq username,password to config
This commit is contained in:
parent
eee352fa4b
commit
81e9febf2a
10 changed files with 24 additions and 948 deletions
|
@ -71,7 +71,8 @@ type ServerConfig struct {
|
|||
Server string `yaml:"server"`
|
||||
Broker string `yam:"broker"`
|
||||
PublicIPService string `yaml:"publicipservice"`
|
||||
MQAdminPassword string `yaml:"mqadminpassword"`
|
||||
MQPassword string `yaml:"mqpassword"`
|
||||
MQUserName string `yaml:"mqusername"`
|
||||
MetricsExporter string `yaml:"metrics_exporter"`
|
||||
BasicAuth string `yaml:"basic_auth"`
|
||||
LicenseValue string `yaml:"license_value"`
|
||||
|
|
|
@ -99,17 +99,6 @@ func updateHost(w http.ResponseWriter, r *http.Request) {
|
|||
if updateRelay {
|
||||
logic.UpdateHostRelay(currHost.ID.String(), currHost.RelayedHosts, newHost.RelayedHosts)
|
||||
}
|
||||
|
||||
newNetworks := logic.GetHostNetworks(newHost.ID.String())
|
||||
if len(newNetworks) > 0 {
|
||||
if err = mq.ModifyClient(&mq.MqClient{
|
||||
ID: currHost.ID.String(),
|
||||
Text: currHost.Name,
|
||||
Networks: newNetworks,
|
||||
}); err != nil {
|
||||
logger.Log(0, r.Header.Get("user"), "failed to update host networks roles in DynSec:", err.Error())
|
||||
}
|
||||
}
|
||||
// publish host update through MQ
|
||||
if err := mq.HostUpdate(&models.HostUpdate{
|
||||
Action: models.UpdateHost,
|
||||
|
@ -163,10 +152,6 @@ func deleteHost(w http.ResponseWriter, r *http.Request) {
|
|||
logger.Log(0, r.Header.Get("user"), "failed to send delete host update: ", currHost.ID.String(), err.Error())
|
||||
}
|
||||
|
||||
if err = mq.DeleteMqClient(currHost.ID.String()); err != nil {
|
||||
logger.Log(0, "error removing DynSec credentials for host:", currHost.Name, err.Error())
|
||||
}
|
||||
|
||||
apiHostData := currHost.ConvertNMHostToAPI()
|
||||
logger.Log(2, r.Header.Get("user"), "removed host", currHost.Name)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
|
@ -215,16 +200,6 @@ func addHostToNetwork(w http.ResponseWriter, r *http.Request) {
|
|||
}); err != nil {
|
||||
logger.Log(0, r.Header.Get("user"), "failed to update host to join network:", hostid, network, err.Error())
|
||||
}
|
||||
networks := logic.GetHostNetworks(currHost.ID.String())
|
||||
if len(networks) > 0 {
|
||||
if err = mq.ModifyClient(&mq.MqClient{
|
||||
ID: currHost.ID.String(),
|
||||
Text: currHost.Name,
|
||||
Networks: networks,
|
||||
}); err != nil {
|
||||
logger.Log(0, r.Header.Get("user"), "failed to update host networks roles in DynSec:", hostid, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
logger.Log(2, r.Header.Get("user"), fmt.Sprintf("added host %s to network %s", currHost.Name, network))
|
||||
w.WriteHeader(http.StatusOK)
|
||||
|
|
|
@ -372,10 +372,6 @@ func deleteNetwork(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
if err := mq.DeleteNetworkRole(network); err != nil {
|
||||
logger.Log(0, fmt.Sprintf("failed to remove network DynSec role: %v", err.Error()))
|
||||
}
|
||||
|
||||
logger.Log(1, r.Header.Get("user"), "deleted network", network)
|
||||
w.WriteHeader(http.StatusOK)
|
||||
json.NewEncoder(w).Encode("success")
|
||||
|
@ -423,11 +419,6 @@ func createNetwork(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
if err = mq.CreateNetworkRole(network.NetID); err != nil {
|
||||
logger.Log(0, r.Header.Get("user"), "failed to create network DynSec role:",
|
||||
err.Error())
|
||||
}
|
||||
|
||||
if err = logic.AddDefaultHostsToNetwork(network.NetID, servercfg.GetServer()); err != nil {
|
||||
logger.Log(0, fmt.Sprintf("failed to add default hosts to network [%v]: %v",
|
||||
network.NetID, err.Error()))
|
||||
|
|
|
@ -576,8 +576,6 @@ func createNode(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
server := servercfg.GetServerInfo()
|
||||
server.TrafficKey = key
|
||||
// consume password before hashing for mq client creation
|
||||
hostPassword := data.Host.HostPass
|
||||
data.Node.Server = servercfg.GetServer()
|
||||
if err := logic.CreateHost(&data.Host); err != nil {
|
||||
if errors.Is(err, logic.ErrHostExists) {
|
||||
|
@ -589,34 +587,12 @@ func createNode(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
logic.UpdateHost(&data.Host, host) // update the in memory struct values
|
||||
networks := logic.GetHostNetworks(data.Host.ID.String())
|
||||
if err := mq.ModifyClient(&mq.MqClient{
|
||||
ID: data.Host.ID.String(),
|
||||
Text: data.Host.Name,
|
||||
Networks: networks,
|
||||
}); err != nil {
|
||||
logger.Log(0, fmt.Sprintf("failed to modify DynSec client: %v", err.Error()))
|
||||
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
|
||||
return
|
||||
}
|
||||
|
||||
} else {
|
||||
logger.Log(0, "error creating host", err.Error())
|
||||
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
|
||||
return
|
||||
}
|
||||
} else {
|
||||
// Create client for this host in Mq
|
||||
if err := mq.CreateMqClient(&mq.MqClient{
|
||||
ID: data.Host.ID.String(),
|
||||
Text: data.Host.Name,
|
||||
Password: hostPassword,
|
||||
Networks: []string{networkName},
|
||||
}); err != nil {
|
||||
logger.Log(0, fmt.Sprintf("failed to create DynSec client: %v", err.Error()))
|
||||
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
err = logic.AssociateNodeToHost(&data.Node, &data.Host)
|
||||
|
@ -1012,12 +988,7 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
|
|||
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
|
||||
return
|
||||
}
|
||||
host, err := logic.GetHost(node.HostID.String())
|
||||
if err != nil {
|
||||
logger.Log(0, "error retrieving host for node", node.ID.String(), err.Error())
|
||||
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
|
||||
return
|
||||
}
|
||||
|
||||
if r.Header.Get("ismaster") != "yes" {
|
||||
username := r.Header.Get("user")
|
||||
if username != "" && !doesUserOwnNode(username, params["network"], nodeid) {
|
||||
|
@ -1031,16 +1002,9 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
logic.ReturnSuccessResponse(w, r, nodeid+" deleted.")
|
||||
logger.Log(1, r.Header.Get("user"), "Deleted node", nodeid, "from network", params["network"])
|
||||
if fromNode { // update networks for host mq client
|
||||
currNets := logic.GetHostNetworks(host.ID.String())
|
||||
if len(currNets) > 0 {
|
||||
mq.ModifyClient(&mq.MqClient{
|
||||
ID: host.ID.String(),
|
||||
Text: host.Name,
|
||||
Networks: currNets,
|
||||
})
|
||||
}
|
||||
} else { // notify node change
|
||||
|
||||
if !fromNode {
|
||||
// notify node change
|
||||
runUpdates(&node, false)
|
||||
}
|
||||
go func() { // notify of peer change
|
||||
|
|
6
main.go
6
main.go
|
@ -139,11 +139,6 @@ func startControllers() {
|
|||
logger.Log(0, "error occurred initializing DNS: ", err.Error())
|
||||
}
|
||||
}
|
||||
if servercfg.IsMessageQueueBackend() {
|
||||
if err := mq.Configure(); err != nil {
|
||||
logger.FatalLog("failed to configure MQ: ", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
//Run Rest Server
|
||||
if servercfg.IsRestBackend() {
|
||||
|
@ -193,7 +188,6 @@ func runMessageQueue(wg *sync.WaitGroup) {
|
|||
defer wg.Done()
|
||||
brokerHost, secure := servercfg.GetMessageQueueEndpoint()
|
||||
logger.Log(0, "connecting to mq broker at", brokerHost, "with TLS?", fmt.Sprintf("%v", secure))
|
||||
mq.SetUpAdminClient()
|
||||
mq.SetupMQTT()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
go mq.Keepalive(ctx)
|
||||
|
|
215
mq/dynsec.go
215
mq/dynsec.go
|
@ -1,215 +0,0 @@
|
|||
package mq
|
||||
|
||||
import (
|
||||
"crypto/sha512"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/gravitl/netmaker/functions"
|
||||
"github.com/gravitl/netmaker/logger"
|
||||
"github.com/gravitl/netmaker/logic"
|
||||
"github.com/gravitl/netmaker/netclient/ncutils"
|
||||
"github.com/gravitl/netmaker/servercfg"
|
||||
"golang.org/x/crypto/pbkdf2"
|
||||
)
|
||||
|
||||
// mq client for admin
|
||||
var mqAdminClient mqtt.Client
|
||||
|
||||
const (
|
||||
// constant for client command
|
||||
CreateClientCmd = "createClient"
|
||||
// constant for disable command
|
||||
DisableClientCmd = "disableClient"
|
||||
// constant for delete client command
|
||||
DeleteClientCmd = "deleteClient"
|
||||
// constant for modify client command
|
||||
ModifyClientCmd = "modifyClient"
|
||||
|
||||
// constant for create role command
|
||||
CreateRoleCmd = "createRole"
|
||||
// constant for delete role command
|
||||
DeleteRoleCmd = "deleteRole"
|
||||
|
||||
// constant for admin user name
|
||||
mqAdminUserName = "Netmaker-Admin"
|
||||
// constant for server user name
|
||||
mqNetmakerServerUserName = "Netmaker-Server"
|
||||
// constant for exporter user name
|
||||
mqExporterUserName = "Netmaker-Exporter"
|
||||
|
||||
// DynamicSecSubTopic - constant for dynamic security subscription topic
|
||||
dynamicSecSubTopic = "$CONTROL/dynamic-security/#"
|
||||
// DynamicSecPubTopic - constant for dynamic security subscription topic
|
||||
dynamicSecPubTopic = "$CONTROL/dynamic-security/v1"
|
||||
)
|
||||
|
||||
// struct for dynamic security file
|
||||
type dynJSON struct {
|
||||
Clients []client `json:"clients"`
|
||||
Roles []role `json:"roles"`
|
||||
DefaultAcl defaultAccessAcl `json:"defaultACLAccess"`
|
||||
}
|
||||
|
||||
// struct for client role
|
||||
type clientRole struct {
|
||||
Rolename string `json:"rolename"`
|
||||
}
|
||||
|
||||
// struct for MQ client
|
||||
type client struct {
|
||||
Username string `json:"username"`
|
||||
TextName string `json:"textName"`
|
||||
Password string `json:"password"`
|
||||
Salt string `json:"salt"`
|
||||
Iterations int `json:"iterations"`
|
||||
Roles []clientRole `json:"roles"`
|
||||
}
|
||||
|
||||
// struct for MQ role
|
||||
type role struct {
|
||||
Rolename string `json:"rolename"`
|
||||
Acls []Acl `json:"acls"`
|
||||
}
|
||||
|
||||
// struct for default acls
|
||||
type defaultAccessAcl struct {
|
||||
PublishClientSend bool `json:"publishClientSend"`
|
||||
PublishClientReceive bool `json:"publishClientReceive"`
|
||||
Subscribe bool `json:"subscribe"`
|
||||
Unsubscribe bool `json:"unsubscribe"`
|
||||
}
|
||||
|
||||
// MqDynSecGroup - struct for MQ client group
|
||||
type MqDynSecGroup struct {
|
||||
Groupname string `json:"groupname"`
|
||||
Priority int `json:"priority"`
|
||||
}
|
||||
|
||||
// MqDynSecRole - struct for MQ client role
|
||||
type MqDynSecRole struct {
|
||||
Rolename string `json:"rolename"`
|
||||
Priority int `json:"priority"`
|
||||
}
|
||||
|
||||
// Acl - struct for MQ acls
|
||||
type Acl struct {
|
||||
AclType string `json:"acltype"`
|
||||
Topic string `json:"topic"`
|
||||
Priority int `json:"priority,omitempty"`
|
||||
Allow bool `json:"allow"`
|
||||
}
|
||||
|
||||
// MqDynSecCmd - struct for MQ dynamic security command
|
||||
type MqDynSecCmd struct {
|
||||
Command string `json:"command"`
|
||||
Username string `json:"username"`
|
||||
Password string `json:"password"`
|
||||
RoleName string `json:"rolename,omitempty"`
|
||||
Acls []Acl `json:"acls,omitempty"`
|
||||
Clientid string `json:"clientid"`
|
||||
Textname string `json:"textname"`
|
||||
Textdescription string `json:"textdescription"`
|
||||
Groups []MqDynSecGroup `json:"groups"`
|
||||
Roles []MqDynSecRole `json:"roles"`
|
||||
}
|
||||
|
||||
// MqDynsecPayload - struct for dynamic security command payload
|
||||
type MqDynsecPayload struct {
|
||||
Commands []MqDynSecCmd `json:"commands"`
|
||||
}
|
||||
|
||||
// encodePasswordToPBKDF2 - encodes the given password with PBKDF2 hashing for MQ
|
||||
func encodePasswordToPBKDF2(password string, salt string, iterations int, keyLength int) string {
|
||||
binaryEncoded := pbkdf2.Key([]byte(password), []byte(salt), iterations, keyLength, sha512.New)
|
||||
return base64.StdEncoding.EncodeToString(binaryEncoded)
|
||||
}
|
||||
|
||||
// Configure - configures the dynamic initial configuration for MQ
|
||||
func Configure() error {
|
||||
|
||||
logger.Log(0, "Configuring MQ...")
|
||||
dynConfig := dynConfigInI
|
||||
path := functions.GetNetmakerPath() + ncutils.GetSeparator() + dynamicSecurityFile
|
||||
|
||||
password := servercfg.GetMqAdminPassword()
|
||||
if password == "" {
|
||||
return errors.New("MQ admin password not provided")
|
||||
}
|
||||
if logic.CheckIfFileExists(path) {
|
||||
data, err := os.ReadFile(path)
|
||||
if err == nil {
|
||||
var cfg dynJSON
|
||||
err = json.Unmarshal(data, &cfg)
|
||||
if err == nil {
|
||||
logger.Log(0, "MQ config exists already, So Updating Existing Config...")
|
||||
dynConfig = cfg
|
||||
}
|
||||
}
|
||||
}
|
||||
exporter := false
|
||||
for i, cI := range dynConfig.Clients {
|
||||
if cI.Username == mqAdminUserName || cI.Username == mqNetmakerServerUserName {
|
||||
salt := logic.RandomString(12)
|
||||
hashed := encodePasswordToPBKDF2(password, salt, 101, 64)
|
||||
cI.Password = hashed
|
||||
cI.Iterations = 101
|
||||
cI.Salt = base64.StdEncoding.EncodeToString([]byte(salt))
|
||||
dynConfig.Clients[i] = cI
|
||||
} else if servercfg.Is_EE && cI.Username == mqExporterUserName {
|
||||
exporter = true
|
||||
exporterPassword := servercfg.GetLicenseKey()
|
||||
salt := logic.RandomString(12)
|
||||
hashed := encodePasswordToPBKDF2(exporterPassword, salt, 101, 64)
|
||||
cI.Password = hashed
|
||||
cI.Iterations = 101
|
||||
cI.Salt = base64.StdEncoding.EncodeToString([]byte(salt))
|
||||
dynConfig.Clients[i] = cI
|
||||
}
|
||||
}
|
||||
if servercfg.Is_EE && !exporter {
|
||||
exporterPassword := servercfg.GetLicenseKey()
|
||||
salt := logic.RandomString(12)
|
||||
hashed := encodePasswordToPBKDF2(exporterPassword, salt, 101, 64)
|
||||
exporterMQClient.Password = hashed
|
||||
exporterMQClient.Iterations = 101
|
||||
exporterMQClient.Salt = base64.StdEncoding.EncodeToString([]byte(salt))
|
||||
dynConfig.Clients = append(dynConfig.Clients, exporterMQClient)
|
||||
dynConfig.Roles = append(dynConfig.Roles, exporterMQRole)
|
||||
}
|
||||
data, err := json.MarshalIndent(dynConfig, "", " ")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return os.WriteFile(path, data, 0755)
|
||||
}
|
||||
|
||||
// publishes the message to dynamic security topic
|
||||
func publishEventToDynSecTopic(payload MqDynsecPayload) error {
|
||||
|
||||
d, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var connecterr error
|
||||
if token := mqAdminClient.Publish(dynamicSecPubTopic, 2, false, d); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
|
||||
if token.Error() == nil {
|
||||
connecterr = errors.New("connect timeout")
|
||||
} else {
|
||||
connecterr = token.Error()
|
||||
}
|
||||
}
|
||||
return connecterr
|
||||
}
|
||||
|
||||
// watchDynSecTopic - message handler for dynamic security responses
|
||||
func watchDynSecTopic(client mqtt.Client, msg mqtt.Message) {
|
||||
|
||||
logger.Log(1, fmt.Sprintf("----->WatchDynSecTopic Message: %+v", string(msg.Payload())))
|
||||
|
||||
}
|
|
@ -1,102 +0,0 @@
|
|||
package mq
|
||||
|
||||
// MqClient - type for taking in an MQ client's data
|
||||
type MqClient struct {
|
||||
ID string
|
||||
Text string
|
||||
Password string
|
||||
Networks []string
|
||||
}
|
||||
|
||||
// ModifyClient - modifies an existing client's network roles
|
||||
func ModifyClient(client *MqClient) error {
|
||||
|
||||
roles := []MqDynSecRole{
|
||||
{
|
||||
Rolename: HostGenericRole,
|
||||
Priority: -1,
|
||||
},
|
||||
{
|
||||
Rolename: getHostRoleName(client.ID),
|
||||
Priority: -1,
|
||||
},
|
||||
}
|
||||
|
||||
for i := range client.Networks {
|
||||
roles = append(roles, MqDynSecRole{
|
||||
Rolename: client.Networks[i],
|
||||
Priority: -1,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
event := MqDynsecPayload{
|
||||
Commands: []MqDynSecCmd{
|
||||
{
|
||||
Command: ModifyClientCmd,
|
||||
Username: client.ID,
|
||||
Textname: client.Text,
|
||||
Roles: roles,
|
||||
Groups: make([]MqDynSecGroup, 0),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
return publishEventToDynSecTopic(event)
|
||||
}
|
||||
|
||||
// DeleteMqClient - removes a client from the DynSec system
|
||||
func DeleteMqClient(hostID string) error {
|
||||
deleteHostRole(hostID)
|
||||
event := MqDynsecPayload{
|
||||
Commands: []MqDynSecCmd{
|
||||
{
|
||||
Command: DeleteClientCmd,
|
||||
Username: hostID,
|
||||
},
|
||||
},
|
||||
}
|
||||
return publishEventToDynSecTopic(event)
|
||||
}
|
||||
|
||||
// CreateMqClient - creates an MQ DynSec client
|
||||
func CreateMqClient(client *MqClient) error {
|
||||
|
||||
err := createHostRole(client.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
roles := []MqDynSecRole{
|
||||
{
|
||||
Rolename: HostGenericRole,
|
||||
Priority: -1,
|
||||
},
|
||||
{
|
||||
Rolename: getHostRoleName(client.ID),
|
||||
Priority: -1,
|
||||
},
|
||||
}
|
||||
|
||||
for i := range client.Networks {
|
||||
roles = append(roles, MqDynSecRole{
|
||||
Rolename: client.Networks[i],
|
||||
Priority: -1,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
event := MqDynsecPayload{
|
||||
Commands: []MqDynSecCmd{
|
||||
{
|
||||
Command: CreateClientCmd,
|
||||
Username: client.ID,
|
||||
Password: client.Password,
|
||||
Textname: client.Text,
|
||||
Roles: roles,
|
||||
Groups: make([]MqDynSecGroup, 0),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
return publishEventToDynSecTopic(event)
|
||||
}
|
|
@ -1,509 +0,0 @@
|
|||
package mq
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
"github.com/gravitl/netmaker/servercfg"
|
||||
)
|
||||
|
||||
const (
|
||||
// constant for admin role
|
||||
adminRole = "admin"
|
||||
// constant for server role
|
||||
serverRole = "server"
|
||||
// constant for exporter role
|
||||
exporterRole = "exporter"
|
||||
// constant for node role
|
||||
NodeRole = "node"
|
||||
// HostGenericRole constant for host role
|
||||
HostGenericRole = "host"
|
||||
|
||||
// const for dynamic security file
|
||||
dynamicSecurityFile = "dynamic-security.json"
|
||||
)
|
||||
|
||||
var (
|
||||
// default configuration of dynamic security
|
||||
dynConfigInI = dynJSON{
|
||||
Clients: []client{
|
||||
{
|
||||
Username: mqAdminUserName,
|
||||
TextName: "netmaker admin user",
|
||||
Password: "",
|
||||
Salt: "",
|
||||
Iterations: 0,
|
||||
Roles: []clientRole{
|
||||
{
|
||||
Rolename: adminRole,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Username: mqNetmakerServerUserName,
|
||||
TextName: "netmaker server user",
|
||||
Password: "",
|
||||
Salt: "",
|
||||
Iterations: 0,
|
||||
Roles: []clientRole{
|
||||
{
|
||||
Rolename: serverRole,
|
||||
},
|
||||
},
|
||||
},
|
||||
exporterMQClient,
|
||||
},
|
||||
Roles: []role{
|
||||
{
|
||||
Rolename: adminRole,
|
||||
Acls: fetchAdminAcls(),
|
||||
},
|
||||
{
|
||||
Rolename: serverRole,
|
||||
Acls: fetchServerAcls(),
|
||||
},
|
||||
{
|
||||
Rolename: HostGenericRole,
|
||||
Acls: fetchNodeAcls(),
|
||||
},
|
||||
exporterMQRole,
|
||||
},
|
||||
DefaultAcl: defaultAccessAcl{
|
||||
PublishClientSend: false,
|
||||
PublishClientReceive: true,
|
||||
Subscribe: false,
|
||||
Unsubscribe: true,
|
||||
},
|
||||
}
|
||||
|
||||
exporterMQClient = client{
|
||||
Username: mqExporterUserName,
|
||||
TextName: "netmaker metrics exporter",
|
||||
Password: "",
|
||||
Salt: "",
|
||||
Iterations: 101,
|
||||
Roles: []clientRole{
|
||||
{
|
||||
Rolename: exporterRole,
|
||||
},
|
||||
},
|
||||
}
|
||||
exporterMQRole = role{
|
||||
Rolename: exporterRole,
|
||||
Acls: fetchExporterAcls(),
|
||||
}
|
||||
)
|
||||
|
||||
// DynListCLientsCmdResp - struct for list clients response from MQ
|
||||
type DynListCLientsCmdResp struct {
|
||||
Responses []struct {
|
||||
Command string `json:"command"`
|
||||
Error string `json:"error"`
|
||||
Data ListClientsData `json:"data"`
|
||||
} `json:"responses"`
|
||||
}
|
||||
|
||||
// ListClientsData - struct for list clients data
|
||||
type ListClientsData struct {
|
||||
Clients []string `json:"clients"`
|
||||
TotalCount int `json:"totalCount"`
|
||||
}
|
||||
|
||||
// GetAdminClient - fetches admin client of the MQ
|
||||
func GetAdminClient() (mqtt.Client, error) {
|
||||
opts := mqtt.NewClientOptions()
|
||||
setMqOptions(mqAdminUserName, servercfg.GetMqAdminPassword(), opts)
|
||||
mqclient := mqtt.NewClient(opts)
|
||||
var connecterr error
|
||||
if token := mqclient.Connect(); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
|
||||
if token.Error() == nil {
|
||||
connecterr = errors.New("connect timeout")
|
||||
} else {
|
||||
connecterr = token.Error()
|
||||
}
|
||||
}
|
||||
return mqclient, connecterr
|
||||
}
|
||||
|
||||
// ListClients - to list all clients in the MQ
|
||||
func ListClients(client mqtt.Client) (ListClientsData, error) {
|
||||
respChan := make(chan mqtt.Message, 10)
|
||||
defer close(respChan)
|
||||
command := "listClients"
|
||||
resp := ListClientsData{}
|
||||
msg := MqDynsecPayload{
|
||||
Commands: []MqDynSecCmd{
|
||||
{
|
||||
Command: command,
|
||||
},
|
||||
},
|
||||
}
|
||||
client.Subscribe("$CONTROL/dynamic-security/v1/response", 2, mqtt.MessageHandler(func(c mqtt.Client, m mqtt.Message) {
|
||||
respChan <- m
|
||||
}))
|
||||
defer client.Unsubscribe()
|
||||
d, _ := json.Marshal(msg)
|
||||
token := client.Publish("$CONTROL/dynamic-security/v1", 2, true, d)
|
||||
if !token.WaitTimeout(30) || token.Error() != nil {
|
||||
var err error
|
||||
if token.Error() == nil {
|
||||
err = errors.New("connection timeout")
|
||||
} else {
|
||||
err = token.Error()
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
|
||||
for m := range respChan {
|
||||
msg := DynListCLientsCmdResp{}
|
||||
json.Unmarshal(m.Payload(), &msg)
|
||||
for _, mI := range msg.Responses {
|
||||
if mI.Command == command {
|
||||
return mI.Data, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return resp, errors.New("resp not found")
|
||||
}
|
||||
|
||||
// fetches host related acls
|
||||
func fetchHostAcls(hostID string) []Acl {
|
||||
return []Acl{
|
||||
{
|
||||
AclType: "publishClientReceive",
|
||||
Topic: fmt.Sprintf("peers/host/%s/#", hostID),
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "publishClientReceive",
|
||||
Topic: fmt.Sprintf("host/update/%s/#", hostID),
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "publishClientSend",
|
||||
Topic: fmt.Sprintf("host/serverupdate/%s", hostID),
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// FetchNetworkAcls - fetches network acls
|
||||
func FetchNetworkAcls(network string) []Acl {
|
||||
return []Acl{
|
||||
{
|
||||
AclType: "publishClientReceive",
|
||||
Topic: fmt.Sprintf("update/%s/#", network),
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "publishClientReceive",
|
||||
Topic: fmt.Sprintf("peers/%s/#", network),
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "publishClientReceive",
|
||||
Topic: fmt.Sprintf("proxy/%s/#", network),
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "subscribePattern",
|
||||
Topic: "#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "unsubscribePattern",
|
||||
Topic: "#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// DeleteNetworkRole - deletes a network role from DynSec system
|
||||
func DeleteNetworkRole(network string) error {
|
||||
// Deletes the network role from MQ
|
||||
event := MqDynsecPayload{
|
||||
Commands: []MqDynSecCmd{
|
||||
{
|
||||
Command: DeleteRoleCmd,
|
||||
RoleName: network,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
return publishEventToDynSecTopic(event)
|
||||
}
|
||||
|
||||
func deleteHostRole(hostID string) error {
|
||||
// Deletes the hostID role from MQ
|
||||
event := MqDynsecPayload{
|
||||
Commands: []MqDynSecCmd{
|
||||
{
|
||||
Command: DeleteRoleCmd,
|
||||
RoleName: getHostRoleName(hostID),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
return publishEventToDynSecTopic(event)
|
||||
}
|
||||
|
||||
// CreateNetworkRole - createss a network role from DynSec system
|
||||
func CreateNetworkRole(network string) error {
|
||||
// Create Role with acls for the network
|
||||
event := MqDynsecPayload{
|
||||
Commands: []MqDynSecCmd{
|
||||
{
|
||||
Command: CreateRoleCmd,
|
||||
RoleName: network,
|
||||
Textname: "Network wide role with Acls for nodes",
|
||||
Acls: FetchNetworkAcls(network),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
return publishEventToDynSecTopic(event)
|
||||
}
|
||||
|
||||
// creates role for the host with ID.
|
||||
func createHostRole(hostID string) error {
|
||||
// Create Role with acls for the host
|
||||
event := MqDynsecPayload{
|
||||
Commands: []MqDynSecCmd{
|
||||
{
|
||||
Command: CreateRoleCmd,
|
||||
RoleName: getHostRoleName(hostID),
|
||||
Textname: "host role with Acls for hosts",
|
||||
Acls: fetchHostAcls(hostID),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
return publishEventToDynSecTopic(event)
|
||||
}
|
||||
|
||||
func getHostRoleName(hostID string) string {
|
||||
return fmt.Sprintf("host-%s", hostID)
|
||||
}
|
||||
|
||||
// serverAcls - fetches server role related acls
|
||||
func fetchServerAcls() []Acl {
|
||||
return []Acl{
|
||||
{
|
||||
AclType: "publishClientSend",
|
||||
Topic: "peers/#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "publishClientSend",
|
||||
Topic: "proxy/#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "publishClientSend",
|
||||
Topic: "peers/host/#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "publishClientSend",
|
||||
Topic: "update/#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "publishClientSend",
|
||||
Topic: "metrics_exporter",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "publishClientSend",
|
||||
Topic: "host/update/#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "publishClientReceive",
|
||||
Topic: "ping/#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "publishClientReceive",
|
||||
Topic: "update/#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "publishClientReceive",
|
||||
Topic: "signal/#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "publishClientReceive",
|
||||
Topic: "metrics/#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "subscribePattern",
|
||||
Topic: "#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "unsubscribePattern",
|
||||
Topic: "#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "publishClientReceive",
|
||||
Topic: "host/serverupdate/#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// fetchNodeAcls - fetches node related acls
|
||||
func fetchNodeAcls() []Acl {
|
||||
// keeping node acls generic as of now.
|
||||
return []Acl{
|
||||
|
||||
{
|
||||
AclType: "publishClientSend",
|
||||
Topic: "signal/#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "publishClientSend",
|
||||
Topic: "update/#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "publishClientSend",
|
||||
Topic: "ping/#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "publishClientSend",
|
||||
Topic: "metrics/#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "subscribePattern",
|
||||
Topic: "#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "unsubscribePattern",
|
||||
Topic: "#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// fetchExporterAcls - fetch exporter role related acls
|
||||
func fetchExporterAcls() []Acl {
|
||||
return []Acl{
|
||||
{
|
||||
AclType: "publishClientReceive",
|
||||
Topic: "metrics_exporter",
|
||||
Allow: true,
|
||||
Priority: -1,
|
||||
},
|
||||
{
|
||||
AclType: "subscribePattern",
|
||||
Topic: "#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "unsubscribePattern",
|
||||
Topic: "#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// fetchAdminAcls - fetches admin role related acls
|
||||
func fetchAdminAcls() []Acl {
|
||||
return []Acl{
|
||||
{
|
||||
AclType: "publishClientSend",
|
||||
Topic: "$CONTROL/dynamic-security/#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "publishClientReceive",
|
||||
Topic: "$CONTROL/dynamic-security/#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "subscribePattern",
|
||||
Topic: "$CONTROL/dynamic-security/#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "publishClientReceive",
|
||||
Topic: "$SYS/#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "subscribePattern",
|
||||
Topic: "$SYS/#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "publishClientReceive",
|
||||
Topic: "#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "subscribePattern",
|
||||
Topic: "#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "unsubscribePattern",
|
||||
Topic: "#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "publishClientSend",
|
||||
Topic: "#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
}
|
||||
}
|
36
mq/mq.go
36
mq/mq.go
|
@ -2,7 +2,6 @@ package mq
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
|
@ -23,39 +22,6 @@ var peer_force_send = 0
|
|||
|
||||
var mqclient mqtt.Client
|
||||
|
||||
// SetUpAdminClient - sets up admin client for the MQ
|
||||
func SetUpAdminClient() {
|
||||
opts := mqtt.NewClientOptions()
|
||||
setMqOptions(mqAdminUserName, servercfg.GetMqAdminPassword(), opts)
|
||||
mqAdminClient = mqtt.NewClient(opts)
|
||||
opts.SetOnConnectHandler(func(client mqtt.Client) {
|
||||
if token := client.Subscribe(dynamicSecSubTopic, 2, mqtt.MessageHandler(watchDynSecTopic)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
|
||||
client.Disconnect(240)
|
||||
logger.Log(0, fmt.Sprintf("Dynamic security client subscription failed: %v ", token.Error()))
|
||||
}
|
||||
|
||||
opts.SetOrderMatters(true)
|
||||
opts.SetResumeSubs(true)
|
||||
})
|
||||
tperiod := time.Now().Add(10 * time.Second)
|
||||
for {
|
||||
if token := mqAdminClient.Connect(); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
|
||||
logger.Log(2, "Admin: unable to connect to broker, retrying ...")
|
||||
if time.Now().After(tperiod) {
|
||||
if token.Error() == nil {
|
||||
logger.FatalLog("Admin: could not connect to broker, token timeout, exiting ...")
|
||||
} else {
|
||||
logger.FatalLog("Admin: could not connect to broker, exiting ...", token.Error().Error())
|
||||
}
|
||||
}
|
||||
} else {
|
||||
break
|
||||
}
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func setMqOptions(user, password string, opts *mqtt.ClientOptions) {
|
||||
broker, _ := servercfg.GetMessageQueueEndpoint()
|
||||
opts.AddBroker(broker)
|
||||
|
@ -73,7 +39,7 @@ func setMqOptions(user, password string, opts *mqtt.ClientOptions) {
|
|||
// SetupMQTT creates a connection to broker and return client
|
||||
func SetupMQTT() {
|
||||
opts := mqtt.NewClientOptions()
|
||||
setMqOptions(mqNetmakerServerUserName, servercfg.GetMqAdminPassword(), opts)
|
||||
setMqOptions(servercfg.GetMqUserName(), servercfg.GetMqPassword(), opts)
|
||||
opts.SetOnConnectHandler(func(client mqtt.Client) {
|
||||
if token := client.Subscribe("ping/#", 2, mqtt.MessageHandler(Ping)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
|
||||
client.Disconnect(240)
|
||||
|
|
|
@ -654,13 +654,24 @@ func GetMQServerPort() string {
|
|||
return port
|
||||
}
|
||||
|
||||
// GetMqAdminPassword - fetches the MQ Admin password
|
||||
func GetMqAdminPassword() string {
|
||||
// GetMqPassword - fetches the MQ password
|
||||
func GetMqPassword() string {
|
||||
password := ""
|
||||
if os.Getenv("MQ_ADMIN_PASSWORD") != "" {
|
||||
password = os.Getenv("MQ_ADMIN_PASSWORD")
|
||||
} else if config.Config.Server.MQAdminPassword != "" {
|
||||
password = config.Config.Server.MQAdminPassword
|
||||
if os.Getenv("MQ_PASSWORD") != "" {
|
||||
password = os.Getenv("MQ_PASSWORD")
|
||||
} else if config.Config.Server.MQPassword != "" {
|
||||
password = config.Config.Server.MQPassword
|
||||
}
|
||||
return password
|
||||
}
|
||||
|
||||
// GetMqUserName - fetches the MQ username
|
||||
func GetMqUserName() string {
|
||||
password := ""
|
||||
if os.Getenv("MQ_USERNAME") != "" {
|
||||
password = os.Getenv("MQ_USERNAME")
|
||||
} else if config.Config.Server.MQUserName != "" {
|
||||
password = config.Config.Server.MQUserName
|
||||
}
|
||||
return password
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue