mirror of
https://github.com/gravitl/netmaker.git
synced 2025-10-24 21:24:35 +08:00
added ability for server to receive and publish host model updates via MQ
This commit is contained in:
parent
63eff85c84
commit
2fa2d50b8f
5 changed files with 87 additions and 12 deletions
|
|
@ -178,6 +178,12 @@ func fetchHostAcls(hostID string) []Acl {
|
|||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "publishClientReceive",
|
||||
Topic: fmt.Sprintf("host/update/%s/#", hostID),
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -353,6 +359,12 @@ func fetchServerAcls() []Acl {
|
|||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
{
|
||||
AclType: "publishClientReceive",
|
||||
Topic: "host/update/#",
|
||||
Priority: -1,
|
||||
Allow: true,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -118,6 +118,45 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
|
|||
}()
|
||||
}
|
||||
|
||||
// UpdateHost message Handler -- handles updates from client hosts
|
||||
func UpdateHost(client mqtt.Client, msg mqtt.Message) {
|
||||
go func() {
|
||||
id, err := getID(msg.Topic())
|
||||
if err != nil {
|
||||
logger.Log(1, "error getting host.ID sent on ", msg.Topic(), err.Error())
|
||||
return
|
||||
}
|
||||
currentHost, err := logic.GetHost(id)
|
||||
if err != nil {
|
||||
logger.Log(1, "error getting node ", id, err.Error())
|
||||
return
|
||||
}
|
||||
decrypted, decryptErr := decryptMsgWithHost(currentHost, msg.Payload())
|
||||
if decryptErr != nil {
|
||||
logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error())
|
||||
return
|
||||
}
|
||||
var newHost models.Host
|
||||
if err := json.Unmarshal(decrypted, &newHost); err != nil {
|
||||
logger.Log(1, "error unmarshaling payload ", err.Error())
|
||||
return
|
||||
}
|
||||
// ifaceDelta := logic.IfaceDelta(¤tHost, newNode)
|
||||
// if servercfg.Is_EE && ifaceDelta {
|
||||
// if err = logic.EnterpriseResetAllPeersFailovers(currentHost.ID.String(), currentHost.Network); err != nil {
|
||||
// logger.Log(1, "failed to reset failover list during node update", currentHost.ID.String(), currentHost.Network)
|
||||
// }
|
||||
// }
|
||||
logic.UpdateHost(&newHost, currentHost)
|
||||
if err := logic.UpsertHost(&newHost); err != nil {
|
||||
logger.Log(1, "error saving host", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
logger.Log(1, "updated host", newHost.ID.String())
|
||||
}()
|
||||
}
|
||||
|
||||
// UpdateMetrics message Handler -- handles updates from client nodes for metrics
|
||||
func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
|
||||
if servercfg.Is_EE {
|
||||
|
|
|
|||
4
mq/mq.go
4
mq/mq.go
|
|
@ -83,6 +83,10 @@ func SetupMQTT() {
|
|||
client.Disconnect(240)
|
||||
logger.Log(0, "node update subscription failed")
|
||||
}
|
||||
if token := client.Subscribe("host/update/#", 0, mqtt.MessageHandler(UpdateHost)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
|
||||
client.Disconnect(240)
|
||||
logger.Log(0, "host update subscription failed")
|
||||
}
|
||||
if token := client.Subscribe("signal/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
|
||||
client.Disconnect(240)
|
||||
logger.Log(0, "node client subscription failed")
|
||||
|
|
|
|||
|
|
@ -113,6 +113,26 @@ func NodeUpdate(node *models.Node) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// HostUpdate -- publishes a host topic update
|
||||
func HostUpdate(host *models.Host) error {
|
||||
if !servercfg.IsMessageQueueBackend() {
|
||||
return nil
|
||||
}
|
||||
logger.Log(3, "publishing host update to "+host.ID.String())
|
||||
|
||||
data, err := json.Marshal(host)
|
||||
if err != nil {
|
||||
logger.Log(2, "error marshalling node update ", err.Error())
|
||||
return err
|
||||
}
|
||||
if err = publish(host, fmt.Sprintf("host/update/%s", host.ID.String()), data); err != nil {
|
||||
logger.Log(2, "error publishing host update to", host.ID.String(), err.Error())
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ProxyUpdate -- publishes updates to peers related to proxy
|
||||
func ProxyUpdate(proxyPayload *proxy_models.ProxyManagerPayload, node *models.Node) error {
|
||||
host, err := logic.GetHost(node.HostID.String())
|
||||
|
|
|
|||
24
mq/util.go
24
mq/util.go
|
|
@ -11,15 +11,7 @@ import (
|
|||
"github.com/gravitl/netmaker/netclient/ncutils"
|
||||
)
|
||||
|
||||
func decryptMsg(node *models.Node, msg []byte) ([]byte, error) {
|
||||
if len(msg) <= 24 { // make sure message is of appropriate length
|
||||
return nil, fmt.Errorf("recieved invalid message from broker %v", msg)
|
||||
}
|
||||
host, err := logic.GetHost(node.HostID.String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func decryptMsgWithHost(host *models.Host, msg []byte) ([]byte, error) {
|
||||
trafficKey, trafficErr := logic.RetrievePrivateTrafficKey() // get server private key
|
||||
if trafficErr != nil {
|
||||
return nil, trafficErr
|
||||
|
|
@ -33,11 +25,19 @@ func decryptMsg(node *models.Node, msg []byte) ([]byte, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if strings.Contains(host.Version, "0.10.0") {
|
||||
return ncutils.BoxDecrypt(msg, nodePubTKey, serverPrivTKey)
|
||||
return ncutils.DeChunk(msg, nodePubTKey, serverPrivTKey)
|
||||
}
|
||||
|
||||
func decryptMsg(node *models.Node, msg []byte) ([]byte, error) {
|
||||
if len(msg) <= 24 { // make sure message is of appropriate length
|
||||
return nil, fmt.Errorf("recieved invalid message from broker %v", msg)
|
||||
}
|
||||
host, err := logic.GetHost(node.HostID.String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ncutils.DeChunk(msg, nodePubTKey, serverPrivTKey)
|
||||
return decryptMsgWithHost(host, msg)
|
||||
}
|
||||
|
||||
func encryptMsg(host *models.Host, msg []byte) ([]byte, error) {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue